Worker பொருள்
Worker வகுப்பு Node.js கிளஸ்டர் தொகுதியின் ஒரு பகுதியாகும், இது ஒரே நேரத்தில் இயங்கும் மற்றும் சேவையக போர்ட்களைப் பகிரும் சைல்ட் செயல்முறைகளை (வேலையாட்கள்) உருவாக்க இயலுமைப்படுத்துகிறது. சுமையைக் கையாள பல-கோர் அமைப்புகளின் பயனை எடுக்க இது குறிப்பாக பயனுள்ளதாக இருக்கிறது.
Worker ஐ இறக்குமதி செய்தல்
Worker பொருள்கள் கிளஸ்டர் தொகுதியைப் பயன்படுத்தும் போது தானாகவே உருவாக்கப்படுகின்றன:
// Workers are created via the cluster module
const cluster = require('cluster');
// To access a Worker object
if (cluster.isPrimary) {
// Fork workers
const worker = cluster.fork();
// Now 'worker' is a Worker object
}
Worker பண்புகள்
| பண்பு | விளக்கம் |
|---|---|
| worker.id | ஒவ்வொரு worker க்கும் ஒரு தனிப்பட்ட id ஒதுக்கப்படும். |
| worker.process | அனைத்து workers களும் child_process.fork() பயன்படுத்தி உருவாக்கப்படுகின்றன, மேலும் இந்த பண்பு அந்த அழைப்பின் முடிவைக் கொண்டுள்ளது. |
| worker.exitedAfterDisconnect | worker .kill() அல்லது .disconnect() காரணமாக வெளியேறினால் இந்த பண்பு true ஆக இருக்கும், இல்லையெனில் இது undefined ஆக இருக்கும். |
| worker.isConnected() | worker அதன் முதன்மையுடன் இணைக்கப்பட்டிருந்தால் true ஐ வழங்குகிறது, இல்லையெனில் false. |
| worker.isDead() | worker இன் செயல்முறை முடிவடைந்தால் (சிக்னல் அல்லது வெளியேறும் குறியீடு மூலம்) true ஐ வழங்குகிறது, இல்லையெனில் false. |
Worker முறைகள்
| முறை | விளக்கம் |
|---|---|
| worker.disconnect() | ஒரு worker இல், இந்த செயல்பாடு அனைத்து சேவையகங்களையும் மூடுகிறது, அந்த சேவையகங்களில் 'close' நிகழ்வுக்காக காத்திருக்கிறது, பின்னர் IPC சேனலைத் துண்டிக்கிறது. முதன்மையில், ஒரு உள் செய்தி worker க்கு அனுப்பப்படுகிறது, அது தன்னைத் தானே .disconnect() அழைக்கக் காரணமாகிறது. |
| worker.kill([signal='SIGTERM']) | worker செயல்முறையைக் கொல்கிறது. இந்த செயல்பாடு worker.process.kill() போன்றது. விருப்ப சிக்னல் அளவுரு worker க்கு என்ன சிக்னல் அனுப்ப வேண்டும் என்பதைக் குறிப்பிடுகிறது. |
| worker.send(message[, sendHandle[, options]][, callback]) | 'message' நிகழ்வாகப் பெறப்படும் worker க்கு ஒரு செய்தியை அனுப்புகிறது. உள்நாட்டில் child_process.send() பயன்படுத்துகிறது. |
Worker நிகழ்வுகள்
| நிகழ்வு | விளக்கம் |
|---|---|
| 'disconnect' | ஒரு worker IPC சேனல் துண்டிக்கப்பட்ட பிறகு வெளியிடப்படுகிறது. இது ஒரு worker நேர்த்தியாக வெளியேறும் போது, கொல்லப்படும் போது அல்லது கைமுறையாக துண்டிக்கப்படும் போது (worker.disconnect() பயன்படுத்தி) ஏற்படுகிறது. |
| 'error' | worker thread கைப்பற்றப்படாத விதிவிலக்கை வீசினால் வெளியிடப்படுகிறது. |
| 'exit' | worker செயல்முறை முடிவடையும் போது வெளியிடப்படுகிறது. கேட்போர் வாதங்களைப் பெறுகிறது (code, signal) இங்கு code என்பது வெளியேறும் குறியீடு மற்றும் signal என்பது செயல்முறையை முடிக்கக் காரணமான சிக்னலின் பெயர். |
| 'listening' | ஒரு worker உள்ள ஒரு சேவையகம் இணைப்புகளுக்குக் கேட்கத் தொடங்கும் போது வெளியிடப்படுகிறது. கேட்போர் வாதங்களைப் பெறுகிறது (address) பயன்படுத்தப்படும் முகவரியைப் பற்றிய தகவலுடன். |
| 'message' | ஒரு worker செய்தியைப் பெறும் போது வெளியிடப்படுகிறது. கேட்போர் வாதங்களைப் பெறுகிறது (message, handle) இங்கு message என்பது அனுப்பப்பட்ட செய்தி மற்றும் handle என்பது net.Socket அல்லது net.Server பொருள் அல்லது undefined. |
| 'online' | worker செயல்முறை forked மற்றும் செய்திகளைப் பெறத் தயாராக இருக்கும் போது வெளியிடப்படுகிறது. |
அடிப்படை கிளஸ்டர் எடுத்துக்காட்டு
பல-செயல்முறை HTTP சேவையகத்தை உருவாக்க Worker பொருள்களுடன் கிளஸ்டரைப் பயன்படுத்துவதற்கான அடிப்படை எடுத்துக்காட்டு இங்கே:
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
// Fork workers
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// Listen for dying workers
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died with code: ${code} and signal: ${signal}`);
console.log('Starting a new worker');
cluster.fork();
});
// Event handlers for Worker objects
cluster.on('fork', (worker) => {
console.log(`Worker ${worker.id} (PID: ${worker.process.pid}) has been forked`);
});
cluster.on('online', (worker) => {
console.log(`Worker ${worker.id} is online`);
});
cluster.on('listening', (worker, address) => {
console.log(`Worker ${worker.id} is listening on ${address.address}:${address.port}`);
});
cluster.on('disconnect', (worker) => {
console.log(`Worker ${worker.id} has disconnected`);
});
} else {
// Workers can share any TCP connection
// In this case it is an HTTP server
http.createServer((req, res) => {
res.writeHead(200);
res.end(`Hello from Worker ${process.pid}\n`);
}).listen(8000);
console.log(`Worker ${process.pid} started`);
}
Worker தகவல்தொடர்பு
முதன்மை செயல்முறை மற்றும் worker செயல்முறைகளுக்கு இடையே செய்திகளை அனுப்பலாம்:
const cluster = require('cluster');
const http = require('http');
if (cluster.isPrimary) {
// Keep track of http requests
let numRequests = 0;
// Create two workers
const worker1 = cluster.fork();
const worker2 = cluster.fork();
// Count requests
function messageHandler(msg) {
if (msg.cmd && msg.cmd === 'notifyRequest') {
numRequests += 1;
console.log(`Total requests: ${numRequests}`);
}
}
// Listen for messages from workers
worker1.on('message', messageHandler);
worker2.on('message', messageHandler);
// Send periodic messages to workers
setInterval(() => {
// Send a message to both workers
worker1.send({ cmd: 'updateTime', time: Date.now() });
worker2.send({ cmd: 'updateTime', time: Date.now() });
}, 5000);
} else {
// Worker process
// Track the last update time
let lastUpdate = Date.now();
// Receive messages from the primary
process.on('message', (msg) => {
if (msg.cmd && msg.cmd === 'updateTime') {
lastUpdate = msg.time;
console.log(`Worker ${process.pid} received time update: ${new Date(lastUpdate)}`);
}
});
// Create an HTTP server
http.createServer((req, res) => {
// Notify the primary about the request
process.send({ cmd: 'notifyRequest' });
// Respond to the request
res.writeHead(200);
res.end(`Hello from Worker ${process.pid}. Last update: ${new Date(lastUpdate)}\n`);
}).listen(8000);
console.log(`Worker ${process.pid} started`);
}
நேர்த்தியான மூடுதல்
Production பயன்பாடுகளுக்கு workers இன் நேர்த்தியான மூடுதலைக் கையாள்வது முக்கியமானது:
const cluster = require('cluster');
const http = require('http');
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
// Fork workers
const numCPUs = require('os').cpus().length;
const workers = [];
for (let i = 0; i < numCPUs; i++) {
workers.push(cluster.fork());
}
// Graceful shutdown function
const shutdown = () => {
console.log('Primary: starting graceful shutdown...');
// Disconnect all workers
for (const worker of workers) {
console.log(`Disconnecting worker ${worker.id}`);
worker.disconnect();
}
// Exit after a timeout if workers haven't exited
setTimeout(() => {
console.log('Primary: some workers did not exit, forcing shutdown');
process.exit(1);
}, 5000);
};
// Listen for worker events
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died (${signal || code}). ` +
`exitedAfterDisconnect: ${worker.exitedAfterDisconnect}`);
// If it's a planned disconnect, don't restart
if (!worker.exitedAfterDisconnect) {
console.log('Worker died unexpectedly, replacing it...');
workers.push(cluster.fork());
}
// Check if all workers are gone
let activeWorkers = 0;
for (const id in cluster.workers) {
activeWorkers++;
}
console.log(`Active workers: ${activeWorkers}`);
if (activeWorkers === 0) {
console.log('All workers have exited, shutting down primary');
process.exit(0);
}
});
// Handle signals for graceful shutdown
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
} else {
// Worker process
// Create a server
const server = http.createServer((req, res) => {
res.writeHead(200);
res.end(`Hello from worker ${process.pid}\n`);
});
server.listen(8000);
console.log(`Worker ${process.pid} started`);
// Handle disconnect signal from primary
process.on('disconnect', () => {
console.log(`Worker ${process.pid} disconnected, closing server...`);
// Close the server
server.close(() => {
console.log(`Worker ${process.pid} closed server, exiting`);
process.exit(0);
});
// Forcefully exit after timeout
setTimeout(() => {
console.log(`Worker ${process.pid} timed out closing server, forcing exit`);
process.exit(1);
}, 2000);
});
}
Worker பூஜ்ஜிய-இடைநிறுத்தம் மீண்டும் தொடக்கம்
ரோலிங் worker புதுப்பிப்புகளுக்கான பூஜ்ஜிய-இடைநிறுத்தம் மீண்டும் தொடக்க முறையை செயல்படுத்துதல்:
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
// Fork initial workers
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// Store worker refs
let workers = Object.values(cluster.workers);
// Restart one worker at a time
function restartWorker(workerIndex) {
const worker = workers[workerIndex];
console.log(`Restarting worker #${worker.id}`);
// Create a new worker
const newWorker = cluster.fork();
// Add the new worker to our array
workers.push(newWorker);
// When the new worker is online, disconnect the old worker
newWorker.on('online', () => {
if (worker) {
console.log(`New worker #${newWorker.id} is online, disconnecting old worker #${worker.id}`);
worker.disconnect();
}
});
// When the old worker is disconnected, remove it from the array
worker.on('disconnect', () => {
console.log(`Worker #${worker.id} disconnected`);
workers = workers.filter(w => w.id !== worker.id);
});
// Continue the process if there are more workers to restart
if (workerIndex + 1 < workers.length) {
setTimeout(() => {
restartWorker(workerIndex + 1);
}, 5000);
}
}
// Example: trigger a rolling restart after 15 seconds
setTimeout(() => {
console.log('Starting rolling restart of workers...');
restartWorker(0);
}, 15000);
// Additional event handlers
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} exited with code ${code}`);
});
} else {
// Worker process
http.createServer((req, res) => {
res.writeHead(200);
res.end(`Hello from worker ${process.pid}, started at ${new Date().toISOString()}\n`);
}).listen(8000);
console.log(`Worker ${process.pid} started`);
}
Worker நிலை கண்காணிப்பு
Worker நிலையைக் கண்காணித்தல் மற்றும் அளவீடுகளைச் சேகரித்தல்:
const cluster = require('cluster');
const http = require('http');
const os = require('os');
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
// Fork workers
const workers = [];
for (let i = 0; i < os.cpus().length; i++) {
workers.push(cluster.fork());
}
// Store metrics for each worker
const workerMetrics = {};
// Set up metrics collection
for (const worker of workers) {
workerMetrics[worker.id] = {
id: worker.id,
pid: worker.process.pid,
requests: 0,
errors: 0,
lastActive: Date.now(),
memoryUsage: {}
};
// Handle messages from workers
worker.on('message', (msg) => {
if (msg.type === 'metrics') {
// Update metrics
workerMetrics[worker.id] = {
...workerMetrics[worker.id],
...msg.data,
lastActive: Date.now()
};
}
});
}
// Create an HTTP server for monitoring
http.createServer((req, res) => {
if (req.url === '/metrics') {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
workers: Object.values(workerMetrics),
system: {
loadAvg: os.loadavg(),
totalMem: os.totalmem(),
freeMem: os.freemem(),
uptime: os.uptime()
}
}, null, 2));
} else {
res.writeHead(404);
res.end('Not found');
}
}).listen(8001);
console.log('Primary: Monitoring server running on port 8001');
// Check for unresponsive workers
setInterval(() => {
const now = Date.now();
for (const worker of workers) {
const metrics = workerMetrics[worker.id];
// If worker hasn't reported in 30 seconds
if (now - metrics.lastActive > 80800) {
console.warn(`Worker ${worker.id} appears unresponsive, restarting...`);
// Kill the unresponsive worker
worker.kill();
// Fork a replacement
const newWorker = cluster.fork();
// Set up metrics for new worker
workerMetrics[newWorker.id] = {
id: newWorker.id,
pid: newWorker.process.pid,
requests: 0,
errors: 0,
lastActive: Date.now(),
memoryUsage: {}
};
// Replace in workers array
const index = workers.indexOf(worker);
if (index !== -1) {
workers[index] = newWorker;
}
// Clean up old metrics
delete workerMetrics[worker.id];
}
}
}, 10000);
} else {
// Worker process
console.log(`Worker ${process.pid} started`);
// Track metrics
let requestCount = 0;
let errorCount = 0;
// Report metrics to primary every 5 seconds
setInterval(() => {
process.send({
type: 'metrics',
data: {
requests: requestCount,
errors: errorCount,
memoryUsage: process.memoryUsage()
}
});
}, 5000);
// Create HTTP server
http.createServer((req, res) => {
requestCount++;
try {
res.writeHead(200);
res.end(`Hello from worker ${process.pid}\n`);
} catch (error) {
errorCount++;
console.error(`Worker ${process.pid} error:`, error);
}
}).listen(8000);
}
Worker சிறந்த நடைமுறைகள்
1. நிலை தனிமைப்படுத்தலை உறுதி செய்யுங்கள்
Worker செயல்முறைகளை நிலையற்றதாக வைத்திருங்கள் அல்லது சரியான நிலை மேலாண்மையை உறுதி செய்யுங்கள்:
// BAD - State shared across forked processes won't work as expected
let requestCount = 0;
// GOOD - Each worker has its own isolated state
if (cluster.isPrimary) {
// Primary logic
} else {
// Worker-specific state
let workerRequestCount = 0;
}
2. எதிர்பாராத Worker முடிவைக் கையாளுங்கள்
if (cluster.isPrimary) {
cluster.on('exit', (worker, code, signal) => {
if (code !== 0 && !worker.exitedAfterDisconnect) {
console.log(`Worker ${worker.id} crashed. Restarting...`);
cluster.fork();
}
});
}
3. Worker Sticky Sessions பயன்படுத்தவும்
const cluster = require('cluster');
const http = require('http');
if (cluster.isPrimary) {
// Setup sticky session logic
cluster.schedulingPolicy = cluster.SCHED_NONE;
// Start workers
const numCPUs = require('os').cpus().length;
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// Create routes based on connection's remote IP
cluster.on('connection', (connection, address) => {
// Calculate which worker gets connection based on IP
const worker = Object.values(cluster.workers)[
Number(address.toString().split(':')[3]) % Object.keys(cluster.workers).length
];
worker.send('sticky-session:connection', connection);
});
} else {
// Worker code
http.createServer((req, res) => {
res.end(`Handled by worker ${process.pid}`);
}).listen(8000, () => {
console.log(`Worker ${process.pid} listening`);
});
// Receive sticky connections
process.on('message', (message, connection) => {
if (message !== 'sticky-session:connection') return;
// Emulate a connection event on the server
server.emit('connection', connection);
connection.resume();
});
}