Node.js Streams

Node.js இல் தரவை திறம்பட கையாள ஸ்ட்ரீம்களைப் பயன்படுத்த கற்றுக்கொள்ளுங்கள்

ஸ்ட்ரீம்கள் என்றால் என்ன?

Node.js இல், ஸ்ட்ரீம்கள் என்பது தரவின் தொகுப்புகள் ஆகும், அவை ஒரே நேரத்தில் முழுமையாக கிடைக்காமல் இருக்கலாம் மற்றும் நினைவகத்தில் பொருந்த வேண்டியதில்லை.

ஒரு இடத்திலிருந்து மற்றொரு இடத்திற்கு தரவை நகர்த்தும் கன்வேயர் பெல்ட்களாக அவற்றை நினைத்துப் பாருங்கள், முழுத் தரவுத் தொகுப்புக்காகக் காத்திருக்காமல் ஒவ்வொரு துண்டும் வரும் போதே வேலை செய்ய உங்களை அனுமதிக்கிறது.

ஸ்ட்ரீம்கள் Node.js இன் மிகவும் சக்திவாய்ந்த அம்சங்களில் ஒன்றாகும் மற்றும் பரவலாகப் பயன்படுத்தப்படுகின்றன:

கோப்பு முறைமை செயல்பாடுகள்

கோப்புகளைப் படித்தல்/எழுதுதல்

HTTP கோரிக்கைகள் மற்றும் பதில்கள்

வலை கோரிக்கைகள் மற்றும் பதில்களை கையாளுதல்

தரவு சுருக்கம் மற்றும் விரிவாக்கம்

தரவை சுருக்குதல் மற்றும் விரிவுபடுத்துதல்

தரவுத்தள செயல்பாடுகள்

தரவுத்தள செயல்பாடுகளை கையாளுதல்

நிகழ்நேர தரவு செயலாக்கம்

நிகழ்நேர தரவு பரிமாற்றம் மற்றும் செயலாக்கம்

ஸ்ட்ரீம்களுடன் தொடங்குதல்

ஸ்ட்ரீம்கள் Node.js இல் தரவை திறம்பட கையாளுவதற்கான அடிப்படை கருத்துக்களில் ஒன்றாகும்.

அவை எல்லாவற்றையும் ஒரே நேரத்தில் நினைவகத்தில் ஏற்றுவதற்குப் பதிலாக, தரவு கிடைக்கும் போது அதைத் துண்டுகளாக செயலாக்க உங்களை அனுமதிக்கின்றன.

அடிப்படை ஸ்ட்ரீம் எடுத்துக்காட்டு

const fs = require('fs');

// Create a readable stream from a file
const readableStream = fs.createReadStream('input.txt', 'utf8');
// Create a writable stream to a file
const writableStream = fs.createWriteStream('output.txt');

// Pipe the data from readable to writable stream
readableStream.pipe(writableStream);

// Handle completion and errors
writableStream.on('finish', () => {
  console.log('File copy completed!');
});

readableStream.on('error', (err) => {
  console.error('Error reading file:', err);
});

writableStream.on('error', (err) => {
  console.error('Error writing file:', err);
});

ஏன் ஸ்ட்ரீம்களைப் பயன்படுத்த வேண்டும்?

ஸ்ட்ரீம்களைப் பயன்படுத்த பல நன்மைகள் உள்ளன:

நினைவக திறன்: பெரிய கோப்புகளை முழுமையாக நினைவகத்தில் ஏற்றாமல் செயலாக்கவும்
நேர திறன்: அனைத்து தரவுக்காகக் காத்திருக்காமல், தரவு கிடைக்கும் விரைவில் செயலாக்கத் தொடங்கவும்
கூட்டுத்தன்மை: ஸ்ட்ரீம்களை இணைப்பதன் மூலம் சக்திவாய்ந்த தரவு குழாய்களை உருவாக்கவும்
சிறந்த பயனர் அனுபவம்: தரவு கிடைக்கும் போது பயனர்களுக்கு வழங்கவும் (எ.கா., வீடியோ ஸ்ட்ரீமிங்)

💡 ஸ்ட்ரீம்களின் நன்மைகள்:

512MB RAM உள்ள ஒரு சேவையகத்தில் 1GB கோப்பைப் படிப்பதை கற்பனை செய்து பாருங்கள்:

  • ஸ்ட்ரீம்கள் இல்லாமல்: முழு கோப்பையும் நினைவகத்தில் ஏற்ற முயற்சிக்கும் போது செயல்முறை கிராஷ் ஆகும்
  • ஸ்ட்ரீம்களுடன்: நீங்கள் கோப்பை சிறிய துண்டுகளாக (எ.கா., ஒரு நேரத்தில் 64KB) செயலாக்குகிறீர்கள்

முக்கிய ஸ்ட்ரீம் வகைகள்

Node.js நான்கு அடிப்படை வகை ஸ்ட்ரீம்களை வழங்குகிறது, ஒவ்வொன்றும் தரவு கையாளுதலில் ஒரு குறிப்பிட்ட நோக்கத்திற்கு சேவை செய்கின்றன:

ஸ்ட்ரீம் வகை விளக்கம் பொதுவான எடுத்துக்காட்டுகள்
Readable தரவைப் படிக்கக்கூடிய ஸ்ட்ரீம்கள் (தரவு மூலம்) fs.createReadStream(), HTTP பதில்கள், process.stdin
Writable தரவை எழுதக்கூடிய ஸ்ட்ரீம்கள் (தரவு இலக்கு) fs.createWriteStream(), HTTP கோரிக்கைகள், process.stdout
Duplex Readable மற்றும் Writable இரண்டுமான ஸ்ட்ரீம்கள் TCP சாக்கெட்டுகள், Zlib ஸ்ட்ரீம்கள்
Transform தரவு எழுதப்பட்டு படிக்கப்படும் போது அதை மாற்றியமைக்கக்கூடிய Duplex ஸ்ட்ரீம்கள் Zlib ஸ்ட்ரீம்கள், crypto ஸ்ட்ரீம்கள்

⚠️ குறிப்பு:

Node.js இல் உள்ள அனைத்து ஸ்ட்ரீம்களும் EventEmitter இன் நிகழ்வுகளாகும், அதாவது அவை கேட்கக்கூடிய மற்றும் கையாளக்கூடிய நிகழ்வுகளை உமிழ்கின்றன.

Readable ஸ்ட்ரீம்கள்

Readable ஸ்ட்ரீம்கள் ஒரு மூலத்திலிருந்து தரவைப் படிக்க உங்களை அனுமதிக்கின்றன. எடுத்துக்காட்டுகள்:

Readable ஸ்ட்ரீம் உருவாக்குதல்

const fs = require('fs');

// Create a readable stream from a file
const readableStream = fs.createReadStream('myfile.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KB chunks
});

// Events for readable streams
readableStream.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  console.log(chunk);
});

readableStream.on('end', () => {
  console.log('No more data to read.');
});

readableStream.on('error', (err) => {
  console.error('Error reading from stream:', err);
});

படிக்கும் முறைகள்

Readable ஸ்ட்ரீம்கள் இரண்டு முறைகளில் ஒன்றில் செயல்படுகின்றன:

const fs = require('fs');

// Paused mode example
const readableStream = fs.createReadStream('myfile.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KB chunks
});

// Manually consume the stream using read()
readableStream.on('readable', () => {
  let chunk;
  while (null !== (chunk = readableStream.read())) {
    console.log(`Read ${chunk.length} bytes of data.`);
    console.log(chunk);
  }
});

readableStream.on('end', () => {
  console.log('No more data to read.');
});

Writable ஸ்ட்ரீம்கள்

Writable ஸ்ட்ரீம்கள் ஒரு இலக்குக்கு தரவை எழுத உங்களை அனுமதிக்கின்றன. எடுத்துக்காட்டுகள்:

Writable ஸ்ட்ரீம் உருவாக்குதல்

const fs = require('fs');

// Create a writable stream to a file
const writableStream = fs.createWriteStream('output.txt');

// Write data to the stream
writableStream.write('Hello, ');
writableStream.write('World!');
writableStream.write('\nWriting to a stream is easy!');

// End the stream
writableStream.end();

// Events for writable streams
writableStream.on('finish', () => {
  console.log('All data has been written to the file.');
});

writableStream.on('error', (err) => {
  console.error('Error writing to stream:', err);
});

பின்னழுத்தத்தைக் கையாளுதல்

ஒரு ஸ்ட்ரீமில் எழுதும் போது, தரவு செயலாக்கப்படுவதை விட வேகமாக எழுதப்பட்டால், பின்னழுத்தம் ஏற்படுகிறது.

write() முறை தொடர்ந்து எழுதுவது பாதுகாப்பானதா என்பதைக் குறிக்கும் பூலியன் மதிப்பை வழங்குகிறது.

const fs = require('fs');

const writableStream = fs.createWriteStream('output.txt');

function writeData() {
  let i = 100;
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // Last time, close the stream
        writableStream.write('Last chunk!\n');
        writableStream.end();
      } else {
        // Continue writing data
        const data = `Data chunk ${i}\n`;
        // Write and check if we should continue
        ok = writableStream.write(data);
      }
    }
    while (i > 0 && ok);

    if (i > 0) {
      // We need to wait for the drain event before writing more
      writableStream.once('drain', write);
    }
  }
  write();
}

writeData();
writableStream.on('finish', () => {
  console.log('All data written successfully.');
});

Pipe

pipe() முறை ஒரு readable ஸ்ட்ரீமை writable ஸ்ட்ரீமுடன் இணைக்கிறது, தரவின் ஓட்டத்தை தானாக மேலாண்மை செய்கிறது மற்றும் பின்னழுத்தத்தைக் கையாளுகிறது.

இது ஸ்ட்ரீம்களை நுகர எளிதான வழியாகும்.

const fs = require('fs');

// Create readable and writable streams
const readableStream = fs.createReadStream('source.txt');
const writableStream = fs.createWriteStream('destination.txt');

// Pipe the readable stream to the writable stream
readableStream.pipe(writableStream);

// Handle completion and errors
readableStream.on('error', (err) => {
  console.error('Read error:', err);
});

writableStream.on('error', (err) => {
  console.error('Write error:', err);
});

writableStream.on('finish', () => {
  console.log('File copy completed!');
});

சங்கிலி Pipes

நீங்கள் pipe() ஐப் பயன்படுத்தி பல ஸ்ட்ரீம்களை ஒன்றாக இணைக்கலாம்.

இது transform ஸ்ட்ரீம்களுடன் பணிபுரியும் போது குறிப்பாக பயனுள்ளதாக இருக்கும்.

const fs = require('fs');
const zlib = require('zlib');

// Create a pipeline to read a file, compress it, and write to a new file
fs.createReadStream('source.txt')
  .pipe(zlib.createGzip()) // Compress the data
  .pipe(fs.createWriteStream('destination.txt.gz'))
  .on('finish', () => {
    console.log('File compressed successfully!');
  });

💡 குறிப்பு:

pipe() முறை இலக்கு ஸ்ட்ரீமை வழங்குகிறது, இது சங்கிலியை இயக்கும்.

Duplex மற்றும் Transform ஸ்ட்ரீம்கள்

Duplex ஸ்ட்ரீம்கள்

Duplex ஸ்ட்ரீம்கள் இரண்டும் readable மற்றும் writable ஆகும், இருவழி குழாய் போன்றது.

ஒரு TCP சாக்கெட் duplex ஸ்ட்ரீமின் நல்ல எடுத்துக்காட்டாகும்.

const net = require('net');

// Create a TCP server
const server = net.createServer((socket) => {
  // 'socket' is a duplex stream

  // Handle incoming data (readable side)
  socket.on('data', (data) => {
    console.log('Received:', data.toString());

    // Echo back (writable side)
    socket.write(`Echo: ${data}`);
  });

  socket.on('end', () => {
    console.log('Client disconnected');
  });
});

server.listen(8080, () => {
  console.log('Server listening on port 8080');
});

// To test, you can use a tool like netcat or telnet:
// $ nc localhost 8080
// or create a client:
/*
const client = net.connect({ port: 8080 }, () => {
  console.log('Connected to server');
  client.write('Hello from client!');
});

client.on('data', (data) => {
  console.log('Server says:', data.toString());
  client.end(); // Close the connection
});
*/

Transform ஸ்ட்ரீம்கள்

Transform ஸ்ட்ரீம்கள் duplex ஸ்ட்ரீம்கள் ஆகும், அவை அதன் வழியாக செல்லும் போது தரவை மாற்ற முடியும்.

அவை குழாய்களில் தரவை செயலாக்குவதற்கு சிறந்தவை.

const { Transform } = require('stream');
const fs = require('fs');

// Create a transform stream that converts text to uppercase
class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    // Transform the chunk to uppercase
    const upperChunk = chunk.toString().toUpperCase();
    // Push the transformed data
    this.push(upperChunk);
    // Signal that we're done with this chunk
    callback();
  }
}

// Create an instance of our transform stream
const uppercaseTransform = new UppercaseTransform();

// Create a readable stream from a file
const readableStream = fs.createReadStream('input.txt');

// Create a writable stream to a file
const writableStream = fs.createWriteStream('output-uppercase.txt');

// Pipe the data through our transform stream
readableStream
  .pipe(uppercaseTransform)
  .pipe(writableStream)
  .on('finish', () => {
    console.log('Transformation completed!');
  });

ஸ்ட்ரீம் நிகழ்வுகள்

அனைத்து ஸ்ட்ரீம்களும் EventEmitter இன் நிகழ்வுகளாகும் மற்றும் பல நிகழ்வுகளை உமிழ்கின்றன:

Readable ஸ்ட்ரீம் நிகழ்வுகள்

Writable ஸ்ட்ரீம் நிகழ்வுகள்

stream.pipeline() முறை

pipeline() செயல்பாடு (Node.js v10.0.0 முதல் கிடைக்கும்) ஸ்ட்ரீம்களை ஒன்றாக இணைக்க மிகவும் உறுதியான வழியாகும், குறிப்பாக பிழை கையாளுதலுக்கு.

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// Create a pipeline that handles errors properly
pipeline(
  fs.createReadStream('source.txt'),
  zlib.createGzip(),
  fs.createWriteStream('destination.txt.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded!');
    }
  }
);

💡 குறிப்பு:

pipeline() அவற்றில் ஏதேனும் பிழை ஏற்பட்டால் அனைத்து ஸ்ட்ரீம்களையும் சரியாக சுத்தம் செய்யும், சாத்தியமான நினைவக கசிவுகளைத் தடுக்கும்.

Object Mode ஸ்ட்ரீம்கள்

இயல்பாக, ஸ்ட்ரீம்கள் சரங்கள் மற்றும் Buffer பொருள்களுடன் வேலை செய்கின்றன.

இருப்பினும், ஸ்ட்ரீம்கள் JavaScript பொருள்களுடன் வேலை செய்ய 'object mode' க்கு அமைக்கப்படலாம்.

const { Readable, Writable, Transform } = require('stream');

// Create a readable stream in object mode
const objectReadable = new Readable({
  objectMode: true,
  read() {} // Implementation required but can be no-op
});

// Create a transform stream in object mode
const objectTransform = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    // Add a property to the object
    chunk.transformed = true;
    chunk.timestamp = new Date();
    this.push(chunk);
    callback();
  }
});

// Create a writable stream in object mode
const objectWritable = new Writable({
  objectMode: true,
  write(chunk, encoding, callback) {
    console.log('Received object:', chunk);
    callback();
  }
});

// Connect the streams
objectReadable
  .pipe(objectTransform)
  .pipe(objectWritable);

// Push some objects to the stream
objectReadable.push({ name: 'Object 1', value: 10 });
objectReadable.push({ name: 'Object 2', value: 20 });
objectReadable.push({ name: 'Object 3', value: 30 });
objectReadable.push(null); // Signal the end of data

மேம்பட்ட ஸ்ட்ரீம் முறைகள்

1. pipeline() உடன் பிழை கையாளுதல்

pipeline() முறை ஸ்ட்ரீம் சங்கிலிகளில் பிழைகளைக் கையாள பரிந்துரைக்கப்படும் வழியாகும்:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('output.txt.gz'),
  (err) => {
   if (err) {
    console.error('Pipeline failed:', err);
   } else {
    console.log('Pipeline succeeded');
   }
  }
);

2. Object Mode ஸ்ட்ரீம்கள்

ஸ்ட்ரீம்கள் சரங்கள் மற்றும் buffers மட்டுமல்ல JavaScript பொருள்களுடன் வேலை செய்ய முடியும்:

const { Readable } = require('stream');

// Create a readable stream in object mode
const objectStream = new Readable({
  objectMode: true,
  read() {}
});
// Push objects to the stream
objectStream.push({ id: 1, name: 'Alice' });
objectStream.push({ id: 2, name: 'Bob' });
objectStream.push(null); // Signal end of stream
// Consume the stream
objectStream.on('data', (obj) => {
  console.log('Received:', obj);
});

நடைமுறை எடுத்துக்காட்டுகள்

HTTP ஸ்ட்ரீமிங்

ஸ்ட்ரீம்கள் HTTP கோரிக்கைகள் மற்றும் பதில்களில் பரவலாகப் பயன்படுத்தப்படுகின்றன.

const http = require('http');
const fs = require('fs');

// Create an HTTP server
const server = http.createServer((req, res) => {
  // Handle different routes
  if (req.url === '/') {
    // Send a simple response
    res.writeHead(200, { 'Content-Type': 'text/html' });
    res.end('

Stream Demo

Try streaming a file or streaming a video.

'); } else if (req.url === '/file') { // Stream a large text file res.writeHead(200, { 'Content-Type': 'text/plain' }); const fileStream = fs.createReadStream('largefile.txt', 'utf8'); // Pipe the file to the response (handles backpressure automatically) fileStream.pipe(res); // Handle errors fileStream.on('error', (err) => { console.error('File stream error:', err); res.statusCode = 500; res.end('Server Error'); }); } else if (req.url === '/video') { // Stream a video file with proper headers const videoPath = 'video.mp4'; const stat = fs.statSync(videoPath); const fileSize = stat.size; const range = req.headers.range; if (range) { // Handle range requests for video seeking const parts = range.replace(/bytes=/, "").split("-"); const start = parseInt(parts[0], 10); const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1; const chunksize = (end - start) + 1; const videoStream = fs.createReadStream(videoPath, { start, end }); res.writeHead(206, { 'Content-Range': `bytes ${start}-${end}/${fileSize}`, 'Accept-Ranges': 'bytes', 'Content-Length': chunksize, 'Content-Type': 'video/mp4' }); videoStream.pipe(res); } else { // No range header, send entire video res.writeHead(200, { 'Content-Length': fileSize, 'Content-Type': 'video/mp4' }); fs.createReadStream(videoPath).pipe(res); } } else { // 404 Not Found res.writeHead(404, { 'Content-Type': 'text/plain' }); res.end('Not Found'); } }); // Start the server server.listen(8080, () => { console.log('Server running at http://localhost:8080/'); });

பெரிய CSV கோப்புகளை செயலாக்குதல்

const fs = require('fs');
const { Transform } = require('stream');
const csv = require('csv-parser'); // npm install csv-parser

// Create a transform stream to filter and transform CSV data
const filterTransform = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    // Only pass through rows that meet our criteria
    if (parseInt(row.age) > 18) {
      // Modify the row
      row.isAdult = 'Yes';
      // Push the transformed row
      this.push(row);
    }
    callback();
  }
});

// Create a writable stream for the results
const results = [];
const writeToArray = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    results.push(row);
    callback();
  }
});

// Create the processing pipeline
fs.createReadStream('people.csv')
  .pipe(csv())
  .pipe(filterTransform)
  .pipe(writeToArray)
  .on('finish', () => {
    console.log(`Processed ${results.length} records:`);
    console.log(results);
  })
  .on('error', (err) => {
    console.error('Error processing CSV:', err);
  });

சிறந்த நடைமுறைகள்

பிழை கையாளுதல்: பயன்பாடு கிராஷ் ஆவதைத் தடுக்க ஸ்ட்ரீம்களில் error நிகழ்வுகளை எப்போதும் கையாளவும்
pipeline() பயன்படுத்தவும்: சிறந்த பிழை கையாளுதல் மற்றும் சுத்தம் செய்ய stream.pipeline() ஐ .pipe() க்கு முன்னுரிமை அளிக்கவும்
பின்னழுத்தத்தைக் கையாளவும்: நினைவக சிக்கல்களைத் தவிர்க்க write() இன் வருமான மதிப்பை மதிக்கவும்
ஸ்ட்ரீம்களை முடிக்கவும்: நீங்கள் முடித்தவுடன் writable ஸ்ட்ரீம்களில் எப்போதும் end() ஐ அழைக்கவும்
ஒத்திசைவான செயல்பாடுகளைத் தவிர்க்கவும்: ஸ்ட்ரீம் கையாளுநர்களுக்குள் ஒத்திசைவான செயல்பாடுகளுடன் நிகழ்வு சுழற்சியைத் தடுக்காதீர்கள்
Buffer அளவு: highWaterMark (buffer அளவு) அமைப்புகளைப் பற்றி கவனத்துடன் இருங்கள்

⚠️ எச்சரிக்கை:

ஸ்ட்ரீம்களை தவறாக கையாளுதல் நினைவக கசிவுகள் மற்றும் செயல்திறன் சிக்கல்களுக்கு வழிவகுக்கும்.

பிழைகளை எப்போதும் கையாளுங்கள் மற்றும் ஸ்ட்ரீம்களை சரியாக முடிக்கவும்.

சுருக்கம்

ஸ்ட்ரீம்கள் Node.js இல் ஒரு அடிப்படை கருத்தாகும், இது திறந்த தரவு கையாளுதலுக்கு அனுமதிக்கிறது. அவை:

பயிற்சி

ஒரு readable ஸ்ட்ரீமை writable ஸ்ட்ரீமுடன் இணைக்க எந்த முறை பயன்படுத்தப்படுகிறது?

connect()
✗ தவறு! "connect()" என்பது Node.js ஸ்ட்ரீம்களில் ஒரு செல்லுபடியாகும் முறை அல்ல
pipe()
✓ சரி! "pipe()" முறை ஒரு readable ஸ்ட்ரீமை writable ஸ்ட்ரீமுடன் இணைக்க பயன்படும் சரியான முறையாகும்
link()
✗ தவறு! "link()" என்பது Node.js ஸ்ட்ரீம்களில் ஒரு செல்லுபடியாகும் முறை அல்ல
flow()
✗ தவறு! "flow()" என்பது Node.js ஸ்ட்ரீம்களில் ஒரு செல்லுபடியாகும் முறை அல்ல