Node.js Worker Reference

Node.js கிளஸ்டர் தொகுதியில் Worker பொருள்கள் மற்றும் பல-செயல்முறை பயன்பாடுகளை அறிக

Worker பொருள்

Worker வகுப்பு Node.js கிளஸ்டர் தொகுதியின் ஒரு பகுதியாகும், இது ஒரே நேரத்தில் இயங்கும் மற்றும் சேவையக போர்ட்களைப் பகிரும் சைல்ட் செயல்முறைகளை (வேலையாட்கள்) உருவாக்க இயலுமைப்படுத்துகிறது. சுமையைக் கையாள பல-கோர் அமைப்புகளின் பயனை எடுக்க இது குறிப்பாக பயனுள்ளதாக இருக்கிறது.

Worker ஐ இறக்குமதி செய்தல்

Worker பொருள்கள் கிளஸ்டர் தொகுதியைப் பயன்படுத்தும் போது தானாகவே உருவாக்கப்படுகின்றன:

// Workers are created via the cluster module
const cluster = require('cluster');

// To access a Worker object
if (cluster.isPrimary) {
  // Fork workers
  const worker = cluster.fork();
  
  // Now 'worker' is a Worker object
}

Worker பண்புகள்

பண்பு விளக்கம்
worker.id ஒவ்வொரு worker க்கும் ஒரு தனிப்பட்ட id ஒதுக்கப்படும்.
worker.process அனைத்து workers களும் child_process.fork() பயன்படுத்தி உருவாக்கப்படுகின்றன, மேலும் இந்த பண்பு அந்த அழைப்பின் முடிவைக் கொண்டுள்ளது.
worker.exitedAfterDisconnect worker .kill() அல்லது .disconnect() காரணமாக வெளியேறினால் இந்த பண்பு true ஆக இருக்கும், இல்லையெனில் இது undefined ஆக இருக்கும்.
worker.isConnected() worker அதன் முதன்மையுடன் இணைக்கப்பட்டிருந்தால் true ஐ வழங்குகிறது, இல்லையெனில் false.
worker.isDead() worker இன் செயல்முறை முடிவடைந்தால் (சிக்னல் அல்லது வெளியேறும் குறியீடு மூலம்) true ஐ வழங்குகிறது, இல்லையெனில் false.

Worker முறைகள்

முறை விளக்கம்
worker.disconnect() ஒரு worker இல், இந்த செயல்பாடு அனைத்து சேவையகங்களையும் மூடுகிறது, அந்த சேவையகங்களில் 'close' நிகழ்வுக்காக காத்திருக்கிறது, பின்னர் IPC சேனலைத் துண்டிக்கிறது.

முதன்மையில், ஒரு உள் செய்தி worker க்கு அனுப்பப்படுகிறது, அது தன்னைத் தானே .disconnect() அழைக்கக் காரணமாகிறது.
worker.kill([signal='SIGTERM']) worker செயல்முறையைக் கொல்கிறது. இந்த செயல்பாடு worker.process.kill() போன்றது. விருப்ப சிக்னல் அளவுரு worker க்கு என்ன சிக்னல் அனுப்ப வேண்டும் என்பதைக் குறிப்பிடுகிறது.
worker.send(message[, sendHandle[, options]][, callback]) 'message' நிகழ்வாகப் பெறப்படும் worker க்கு ஒரு செய்தியை அனுப்புகிறது. உள்நாட்டில் child_process.send() பயன்படுத்துகிறது.

Worker நிகழ்வுகள்

நிகழ்வு விளக்கம்
'disconnect' ஒரு worker IPC சேனல் துண்டிக்கப்பட்ட பிறகு வெளியிடப்படுகிறது. இது ஒரு worker நேர்த்தியாக வெளியேறும் போது, கொல்லப்படும் போது அல்லது கைமுறையாக துண்டிக்கப்படும் போது (worker.disconnect() பயன்படுத்தி) ஏற்படுகிறது.
'error' worker thread கைப்பற்றப்படாத விதிவிலக்கை வீசினால் வெளியிடப்படுகிறது.
'exit' worker செயல்முறை முடிவடையும் போது வெளியிடப்படுகிறது. கேட்போர் வாதங்களைப் பெறுகிறது (code, signal) இங்கு code என்பது வெளியேறும் குறியீடு மற்றும் signal என்பது செயல்முறையை முடிக்கக் காரணமான சிக்னலின் பெயர்.
'listening' ஒரு worker உள்ள ஒரு சேவையகம் இணைப்புகளுக்குக் கேட்கத் தொடங்கும் போது வெளியிடப்படுகிறது. கேட்போர் வாதங்களைப் பெறுகிறது (address) பயன்படுத்தப்படும் முகவரியைப் பற்றிய தகவலுடன்.
'message' ஒரு worker செய்தியைப் பெறும் போது வெளியிடப்படுகிறது. கேட்போர் வாதங்களைப் பெறுகிறது (message, handle) இங்கு message என்பது அனுப்பப்பட்ட செய்தி மற்றும் handle என்பது net.Socket அல்லது net.Server பொருள் அல்லது undefined.
'online' worker செயல்முறை forked மற்றும் செய்திகளைப் பெறத் தயாராக இருக்கும் போது வெளியிடப்படுகிறது.

அடிப்படை கிளஸ்டர் எடுத்துக்காட்டு

பல-செயல்முறை HTTP சேவையகத்தை உருவாக்க Worker பொருள்களுடன் கிளஸ்டரைப் பயன்படுத்துவதற்கான அடிப்படை எடுத்துக்காட்டு இங்கே:

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  // Listen for dying workers
  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died with code: ${code} and signal: ${signal}`);
    console.log('Starting a new worker');
    cluster.fork();
  });
  
  // Event handlers for Worker objects
  cluster.on('fork', (worker) => {
    console.log(`Worker ${worker.id} (PID: ${worker.process.pid}) has been forked`);
  });
  
  cluster.on('online', (worker) => {
    console.log(`Worker ${worker.id} is online`);
  });
  
  cluster.on('listening', (worker, address) => {
    console.log(`Worker ${worker.id} is listening on ${address.address}:${address.port}`);
  });
  
  cluster.on('disconnect', (worker) => {
    console.log(`Worker ${worker.id} has disconnected`);
  });

} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`Hello from Worker ${process.pid}\n`);
  }).listen(8000);

  console.log(`Worker ${process.pid} started`);
}

Worker தகவல்தொடர்பு

முதன்மை செயல்முறை மற்றும் worker செயல்முறைகளுக்கு இடையே செய்திகளை அனுப்பலாம்:

const cluster = require('cluster');
const http = require('http');

if (cluster.isPrimary) {
  // Keep track of http requests
  let numRequests = 0;
  
  // Create two workers
  const worker1 = cluster.fork();
  const worker2 = cluster.fork();
  
  // Count requests
  function messageHandler(msg) {
    if (msg.cmd && msg.cmd === 'notifyRequest') {
      numRequests += 1;
      console.log(`Total requests: ${numRequests}`);
    }
  }
  
  // Listen for messages from workers
  worker1.on('message', messageHandler);
  worker2.on('message', messageHandler);
  
  // Send periodic messages to workers
  setInterval(() => {
    // Send a message to both workers
    worker1.send({ cmd: 'updateTime', time: Date.now() });
    worker2.send({ cmd: 'updateTime', time: Date.now() });
  }, 5000);

} else {
  // Worker process
  
  // Track the last update time
  let lastUpdate = Date.now();
  
  // Receive messages from the primary
  process.on('message', (msg) => {
    if (msg.cmd && msg.cmd === 'updateTime') {
      lastUpdate = msg.time;
      console.log(`Worker ${process.pid} received time update: ${new Date(lastUpdate)}`);
    }
  });
  
  // Create an HTTP server
  http.createServer((req, res) => {
    // Notify the primary about the request
    process.send({ cmd: 'notifyRequest' });
    
    // Respond to the request
    res.writeHead(200);
    res.end(`Hello from Worker ${process.pid}. Last update: ${new Date(lastUpdate)}\n`);
  }).listen(8000);
  
  console.log(`Worker ${process.pid} started`);
}

நேர்த்தியான மூடுதல்

Production பயன்பாடுகளுக்கு workers இன் நேர்த்தியான மூடுதலைக் கையாள்வது முக்கியமானது:

const cluster = require('cluster');
const http = require('http');

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers
  const numCPUs = require('os').cpus().length;
  const workers = [];
  
  for (let i = 0; i < numCPUs; i++) {
    workers.push(cluster.fork());
  }
  
  // Graceful shutdown function
  const shutdown = () => {
    console.log('Primary: starting graceful shutdown...');
    
    // Disconnect all workers
    for (const worker of workers) {
      console.log(`Disconnecting worker ${worker.id}`);
      worker.disconnect();
    }
    
    // Exit after a timeout if workers haven't exited
    setTimeout(() => {
      console.log('Primary: some workers did not exit, forcing shutdown');
      process.exit(1);
    }, 5000);
  };

  // Listen for worker events
  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died (${signal || code}). ` +
                `exitedAfterDisconnect: ${worker.exitedAfterDisconnect}`);
    
    // If it's a planned disconnect, don't restart
    if (!worker.exitedAfterDisconnect) {
      console.log('Worker died unexpectedly, replacing it...');
      workers.push(cluster.fork());
    }
    
    // Check if all workers are gone
    let activeWorkers = 0;
    for (const id in cluster.workers) {
      activeWorkers++;
    }
    
    console.log(`Active workers: ${activeWorkers}`);
    
    if (activeWorkers === 0) {
      console.log('All workers have exited, shutting down primary');
      process.exit(0);
    }
  });

  // Handle signals for graceful shutdown
  process.on('SIGTERM', shutdown);
  process.on('SIGINT', shutdown);

} else {
  // Worker process
  
  // Create a server
  const server = http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`Hello from worker ${process.pid}\n`);
  });
  
  server.listen(8000);
  
  console.log(`Worker ${process.pid} started`);
  
  // Handle disconnect signal from primary
  process.on('disconnect', () => {
    console.log(`Worker ${process.pid} disconnected, closing server...`);
    
    // Close the server
    server.close(() => {
      console.log(`Worker ${process.pid} closed server, exiting`);
      process.exit(0);
    });
    
    // Forcefully exit after timeout
    setTimeout(() => {
      console.log(`Worker ${process.pid} timed out closing server, forcing exit`);
      process.exit(1);
    }, 2000);
  });
}

Worker பூஜ்ஜிய-இடைநிறுத்தம் மீண்டும் தொடக்கம்

ரோலிங் worker புதுப்பிப்புகளுக்கான பூஜ்ஜிய-இடைநிறுத்தம் மீண்டும் தொடக்க முறையை செயல்படுத்துதல்:

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork initial workers
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  // Store worker refs
  let workers = Object.values(cluster.workers);

  // Restart one worker at a time
  function restartWorker(workerIndex) {
    const worker = workers[workerIndex];
    console.log(`Restarting worker #${worker.id}`);

    // Create a new worker
    const newWorker = cluster.fork();
    
    // Add the new worker to our array
    workers.push(newWorker);
    
    // When the new worker is online, disconnect the old worker
    newWorker.on('online', () => {
      if (worker) {
        console.log(`New worker #${newWorker.id} is online, disconnecting old worker #${worker.id}`);
        worker.disconnect();
      }
    });

    // When the old worker is disconnected, remove it from the array
    worker.on('disconnect', () => {
      console.log(`Worker #${worker.id} disconnected`);
      workers = workers.filter(w => w.id !== worker.id);
    });

    // Continue the process if there are more workers to restart
    if (workerIndex + 1 < workers.length) {
      setTimeout(() => {
        restartWorker(workerIndex + 1);
      }, 5000);
    }
  }

  // Example: trigger a rolling restart after 15 seconds
  setTimeout(() => {
    console.log('Starting rolling restart of workers...');
    restartWorker(0);
  }, 15000);

  // Additional event handlers
  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} exited with code ${code}`);
  });

} else {
  // Worker process
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`Hello from worker ${process.pid}, started at ${new Date().toISOString()}\n`);
  }).listen(8000);

  console.log(`Worker ${process.pid} started`);
}

Worker நிலை கண்காணிப்பு

Worker நிலையைக் கண்காணித்தல் மற்றும் அளவீடுகளைச் சேகரித்தல்:

const cluster = require('cluster');
const http = require('http');
const os = require('os');

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers
  const workers = [];
  for (let i = 0; i < os.cpus().length; i++) {
    workers.push(cluster.fork());
  }

  // Store metrics for each worker
  const workerMetrics = {};

  // Set up metrics collection
  for (const worker of workers) {
    workerMetrics[worker.id] = {
      id: worker.id,
      pid: worker.process.pid,
      requests: 0,
      errors: 0,
      lastActive: Date.now(),
      memoryUsage: {}
    };

    // Handle messages from workers
    worker.on('message', (msg) => {
      if (msg.type === 'metrics') {
        // Update metrics
        workerMetrics[worker.id] = {
          ...workerMetrics[worker.id],
          ...msg.data,
          lastActive: Date.now()
        };
      }
    });
  }

  // Create an HTTP server for monitoring
  http.createServer((req, res) => {
    if (req.url === '/metrics') {
      res.writeHead(200, { 'Content-Type': 'application/json' });
      res.end(JSON.stringify({
        workers: Object.values(workerMetrics),
        system: {
          loadAvg: os.loadavg(),
          totalMem: os.totalmem(),
          freeMem: os.freemem(),
          uptime: os.uptime()
        }
      }, null, 2));
    } else {
      res.writeHead(404);
      res.end('Not found');
    }
  }).listen(8001);

  console.log('Primary: Monitoring server running on port 8001');

  // Check for unresponsive workers
  setInterval(() => {
    const now = Date.now();
    
    for (const worker of workers) {
      const metrics = workerMetrics[worker.id];
      
      // If worker hasn't reported in 30 seconds
      if (now - metrics.lastActive > 80800) {
        console.warn(`Worker ${worker.id} appears unresponsive, restarting...`);
        
        // Kill the unresponsive worker
        worker.kill();
        
        // Fork a replacement
        const newWorker = cluster.fork();
        
        // Set up metrics for new worker
        workerMetrics[newWorker.id] = {
          id: newWorker.id,
          pid: newWorker.process.pid,
          requests: 0,
          errors: 0,
          lastActive: Date.now(),
          memoryUsage: {}
        };
        
        // Replace in workers array
        const index = workers.indexOf(worker);
        if (index !== -1) {
          workers[index] = newWorker;
        }
        
        // Clean up old metrics
        delete workerMetrics[worker.id];
      }
    }
  }, 10000);

} else {
  // Worker process
  console.log(`Worker ${process.pid} started`);
  
  // Track metrics
  let requestCount = 0;
  let errorCount = 0;
  
  // Report metrics to primary every 5 seconds
  setInterval(() => {
    process.send({
      type: 'metrics',
      data: {
        requests: requestCount,
        errors: errorCount,
        memoryUsage: process.memoryUsage()
      }
    });
  }, 5000);

  // Create HTTP server
  http.createServer((req, res) => {
    requestCount++;
    
    try {
      res.writeHead(200);
      res.end(`Hello from worker ${process.pid}\n`);
    } catch (error) {
      errorCount++;
      console.error(`Worker ${process.pid} error:`, error);
    }
  }).listen(8000);
}

Worker சிறந்த நடைமுறைகள்

1. நிலை தனிமைப்படுத்தலை உறுதி செய்யுங்கள்

Worker செயல்முறைகளை நிலையற்றதாக வைத்திருங்கள் அல்லது சரியான நிலை மேலாண்மையை உறுதி செய்யுங்கள்:

// BAD - State shared across forked processes won't work as expected
let requestCount = 0;

// GOOD - Each worker has its own isolated state
if (cluster.isPrimary) {
  // Primary logic
} else {
  // Worker-specific state
  let workerRequestCount = 0;
}

2. எதிர்பாராத Worker முடிவைக் கையாளுங்கள்

if (cluster.isPrimary) {
  cluster.on('exit', (worker, code, signal) => {
    if (code !== 0 && !worker.exitedAfterDisconnect) {
      console.log(`Worker ${worker.id} crashed. Restarting...`);
      cluster.fork();
    }
  });
}

3. Worker Sticky Sessions பயன்படுத்தவும்

const cluster = require('cluster');
const http = require('http');

if (cluster.isPrimary) {
  // Setup sticky session logic
  cluster.schedulingPolicy = cluster.SCHED_NONE;
  
  // Start workers
  const numCPUs = require('os').cpus().length;
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
  
  // Create routes based on connection's remote IP
  cluster.on('connection', (connection, address) => {
    // Calculate which worker gets connection based on IP
    const worker = Object.values(cluster.workers)[
      Number(address.toString().split(':')[3]) % Object.keys(cluster.workers).length
    ];
    worker.send('sticky-session:connection', connection);
  });
} else {
  // Worker code
  http.createServer((req, res) => {
    res.end(`Handled by worker ${process.pid}`);
  }).listen(8000, () => {
    console.log(`Worker ${process.pid} listening`);
  });
  
  // Receive sticky connections
  process.on('message', (message, connection) => {
    if (message !== 'sticky-session:connection') return;
    
    // Emulate a connection event on the server
    server.emit('connection', connection);
    connection.resume();
  });
}

பயிற்சி

Node.js கிளஸ்டர் தொகுதியில் worker செயல்முறையை உருவாக்க எந்த முறை பயன்படுத்தப்படுகிறது? தேர்வு செய்யவும்.

cluster.create()
✗ தவறு! cluster.create() என்பது கிளஸ்டர் தொகுதியில் ஒரு செல்லுபடியாகும் முறை அல்ல
cluster.fork()
✓ சரி! cluster.fork() முறை புதிய worker செயல்முறைகளை உருவாக்க பயன்படுகிறது
cluster.spawn()
✗ தவறு! cluster.spawn() என்பது கிளஸ்டர் தொகுதியில் ஒரு செல்லுபடியாகும் முறை அல்ல
worker.start()
✗ தவறு! worker.start() என்பது கிளஸ்டர் தொகுதியில் ஒரு செல்லுபடியாகும் முறை அல்ல