শ্রমিক বস্তু
কর্মী শ্রেণী হল Node.js ক্লাস্টার মডিউলের অংশ, যা শিশু প্রক্রিয়া (শ্রমিক) তৈরি করতে সক্ষম করে যা একযোগে চলে এবং সার্ভার পোর্ট শেয়ার করে। লোড হ্যান্ডেল করার জন্য মাল্টি-কোর সিস্টেমের সুবিধা নেওয়া বিশেষভাবে কার্যকর।
কর্মী আমদানি করা
ক্লাস্টার মডিউল ব্যবহার করার সময় কর্মী বস্তুগুলি স্বয়ংক্রিয়ভাবে তৈরি হয়:
// Workers are created via the cluster module
const cluster = require('cluster');
// To access a Worker object
if (cluster.isPrimary) {
// Fork workers
const worker = cluster.fork();
// Now 'worker' is a Worker object
}
শ্রমিক বৈশিষ্ট্য
| বৈশিষ্ট্য | ব্যাখ্যা |
|---|---|
| worker.id | প্রতিটি কর্মীকে একটি অনন্য আইডি বরাদ্দ করা হবে। |
| worker.process | সমস্ত কর্মী child_process.fork() ব্যবহার করে তৈরি করা হয়েছে এবং এই সম্পত্তি সেই কলের ফলাফল ধারণ করে। |
| worker.exitedAfterDisconnect | এই বৈশিষ্ট্যটি সত্য যদি কর্মী .kill() বা .disconnect() এর কারণে প্রস্থান করে, অন্যথায় এটি অনির্ধারিত। |
| worker.isConnected() | কর্মী তার পিতামাতার সাথে সংযুক্ত থাকলে সত্য দেখায়, অন্যথায় মিথ্যা। |
| worker.isDead() | কর্মীর প্রক্রিয়া বন্ধ হলে (সংকেত বা প্রস্থান কোড দ্বারা) সত্য ফেরত দেয়, অন্যথায় মিথ্যা। |
কর্মী পদ্ধতি
| পদ্ধতি | ব্যাখ্যা |
|---|---|
| worker.disconnect() | একজন কর্মীর মধ্যে, এই ফাংশনটি সমস্ত সার্ভার বন্ধ করে, সেই সার্ভারগুলিতে একটি 'ক্লোজ' ইভেন্টের জন্য অপেক্ষা করে এবং তারপর IPC চ্যানেল সংযোগ বিচ্ছিন্ন করে। প্রাথমিকভাবে, কর্মীকে একটি অভ্যন্তরীণ বার্তা পাঠানো হয়, যার ফলে এটি নিজেই .disconnect() কল করে। |
| worker.kill([signal='SIGTERM']) | কর্মী প্রক্রিয়াকে হত্যা করে। এই ফাংশন worker.process.kill() এর অনুরূপ। ঐচ্ছিক সংকেত পরামিতি নির্দিষ্ট করে কর্মীকে কোন সংকেত পাঠাতে হবে। |
| worker.send(message[, sendHandle[, options]][, callback]) | 'বার্তা' ইভেন্ট হিসাবে প্রাপ্ত কর্মীকে একটি বার্তা পাঠায়। অভ্যন্তরীণভাবে child_process.send() ব্যবহার করে। |
কর্মীর দৃষ্টান্ত
| ঘটনা | ব্যাখ্যা |
|---|---|
| 'disconnect' | একজন কর্মী আইপিসি চ্যানেল সংযোগ বিচ্ছিন্ন হওয়ার পরে মুক্তি পায়৷ এটি ঘটে যখন একজন কর্মী সুন্দরভাবে প্রস্থান করে, নিহত হয় বা ম্যানুয়ালি সংযোগ বিচ্ছিন্ন হয় (worker.disconnect() ব্যবহার করে)। |
| 'error' | যদি ওয়ার্কার থ্রেড কোনো ধরা না পড়া ব্যতিক্রম ছুড়ে দেয় তাহলে জারি করা হবে। |
| 'exit' | কর্মী প্রক্রিয়া শেষ হলে নির্গত হয়। শ্রোতা আর্গুমেন্ট (কোড, সিগন্যাল) নেয় যেখানে কোড হল প্রস্থান কোড এবং সিগন্যাল হল সেই সিগন্যালের নাম যা প্রক্রিয়াটি বন্ধ করে দেয়। |
| 'listening' | যখন একজন কর্মী সহ সার্ভার সংযোগের জন্য জিজ্ঞাসা করা শুরু করে তখন মুক্তি হয়৷ শ্রোতা ব্যবহার করা ঠিকানা সম্পর্কে তথ্য সহ আর্গুমেন্ট (ঠিকানা) গ্রহণ করে। |
| 'message' | একজন কর্মী বার্তা পেলে নির্গত হয়। শ্রোতা আর্গুমেন্ট (বার্তা, হ্যান্ডেল) নেয় যেখানে বার্তাটি পাঠানো বার্তা এবং হ্যান্ডেল একটি নেট। সকেট বা নেট। সার্ভার অবজেক্ট বা অনির্ধারিত। |
| 'online' | কর্মী প্রক্রিয়া ফর্কড এবং বার্তা গ্রহণের জন্য প্রস্তুত হলে মুক্তি পায়। |
বেসিক ক্লাস্টার উদাহরণ
এখানে একটি মাল্টি-প্রসেস HTTP সার্ভার তৈরি করতে Worker অবজেক্টের সাথে একটি ক্লাস্টার ব্যবহার করার একটি মৌলিক উদাহরণ রয়েছে:
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
// Fork workers
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// Listen for dying workers
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died with code: ${code} and signal: ${signal}`);
console.log('Starting a new worker');
cluster.fork();
});
// Event handlers for Worker objects
cluster.on('fork', (worker) => {
console.log(`Worker ${worker.id} (PID: ${worker.process.pid}) has been forked`);
});
cluster.on('online', (worker) => {
console.log(`Worker ${worker.id} is online`);
});
cluster.on('listening', (worker, address) => {
console.log(`Worker ${worker.id} is listening on ${address.address}:${address.port}`);
});
cluster.on('disconnect', (worker) => {
console.log(`Worker ${worker.id} has disconnected`);
});
} else {
// Workers can share any TCP connection
// In this case it is an HTTP server
http.createServer((req, res) => {
res.writeHead(200);
res.end(`Hello from Worker ${process.pid}\n`);
}).listen(8000);
console.log(`Worker ${process.pid} started`);
}
কর্মী যোগাযোগ
মাস্টার প্রক্রিয়া এবং কর্মী প্রক্রিয়ার মধ্যে বার্তা প্রেরণ করা যেতে পারে:
const cluster = require('cluster');
const http = require('http');
if (cluster.isPrimary) {
// Keep track of http requests
let numRequests = 0;
// Create two workers
const worker1 = cluster.fork();
const worker2 = cluster.fork();
// Count requests
function messageHandler(msg) {
if (msg.cmd && msg.cmd === 'notifyRequest') {
numRequests += 1;
console.log(`Total requests: ${numRequests}`);
}
}
// Listen for messages from workers
worker1.on('message', messageHandler);
worker2.on('message', messageHandler);
// Send periodic messages to workers
setInterval(() => {
// Send a message to both workers
worker1.send({ cmd: 'updateTime', time: Date.now() });
worker2.send({ cmd: 'updateTime', time: Date.now() });
}, 5000);
} else {
// Worker process
// Track the last update time
let lastUpdate = Date.now();
// Receive messages from the primary
process.on('message', (msg) => {
if (msg.cmd && msg.cmd === 'updateTime') {
lastUpdate = msg.time;
console.log(`Worker ${process.pid} received time update: ${new Date(lastUpdate)}`);
}
});
// Create an HTTP server
http.createServer((req, res) => {
// Notify the primary about the request
process.send({ cmd: 'notifyRequest' });
// Respond to the request
res.writeHead(200);
res.end(`Hello from Worker ${process.pid}. Last update: ${new Date(lastUpdate)}\n`);
}).listen(8000);
console.log(`Worker ${process.pid} started`);
}
ঝরঝরে বন্ধ
উত্পাদন অ্যাপ্লিকেশনের জন্য শ্রমিকদের মার্জিত বন্ধ পরিচালনা করা গুরুত্বপূর্ণ:
const cluster = require('cluster');
const http = require('http');
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
// Fork workers
const numCPUs = require('os').cpus().length;
const workers = [];
for (let i = 0; i < numCPUs; i++) {
workers.push(cluster.fork());
}
// Graceful shutdown function
const shutdown = () => {
console.log('Primary: starting graceful shutdown...');
// Disconnect all workers
for (const worker of workers) {
console.log(`Disconnecting worker ${worker.id}`);
worker.disconnect();
}
// Exit after a timeout if workers haven't exited
setTimeout(() => {
console.log('Primary: some workers did not exit, forcing shutdown');
process.exit(1);
}, 5000);
};
// Listen for worker events
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died (${signal || code}). ` +
`exitedAfterDisconnect: ${worker.exitedAfterDisconnect}`);
// If it's a planned disconnect, don't restart
if (!worker.exitedAfterDisconnect) {
console.log('Worker died unexpectedly, replacing it...');
workers.push(cluster.fork());
}
// Check if all workers are gone
let activeWorkers = 0;
for (const id in cluster.workers) {
activeWorkers++;
}
console.log(`Active workers: ${activeWorkers}`);
if (activeWorkers === 0) {
console.log('All workers have exited, shutting down primary');
process.exit(0);
}
});
// Handle signals for graceful shutdown
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
} else {
// Worker process
// Create a server
const server = http.createServer((req, res) => {
res.writeHead(200);
res.end(`Hello from worker ${process.pid}\n`);
});
server.listen(8000);
console.log(`Worker ${process.pid} started`);
// Handle disconnect signal from primary
process.on('disconnect', () => {
console.log(`Worker ${process.pid} disconnected, closing server...`);
// Close the server
server.close(() => {
console.log(`Worker ${process.pid} closed server, exiting`);
process.exit(0);
});
// Forcefully exit after timeout
setTimeout(() => {
console.log(`Worker ${process.pid} timed out closing server, forcing exit`);
process.exit(1);
}, 2000);
});
}
কর্মী জিরো-পজ রিস্টার্ট
রোলিং ওয়ার্কার আপডেটের জন্য শূন্য-পজ রিস্টার্ট পদ্ধতি প্রয়োগ করা:
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
// Fork initial workers
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// Store worker refs
let workers = Object.values(cluster.workers);
// Restart one worker at a time
function restartWorker(workerIndex) {
const worker = workers[workerIndex];
console.log(`Restarting worker #${worker.id}`);
// Create a new worker
const newWorker = cluster.fork();
// Add the new worker to our array
workers.push(newWorker);
// When the new worker is online, disconnect the old worker
newWorker.on('online', () => {
if (worker) {
console.log(`New worker #${newWorker.id} is online, disconnecting old worker #${worker.id}`);
worker.disconnect();
}
});
// When the old worker is disconnected, remove it from the array
worker.on('disconnect', () => {
console.log(`Worker #${worker.id} disconnected`);
workers = workers.filter(w => w.id !== worker.id);
});
// Continue the process if there are more workers to restart
if (workerIndex + 1 < workers.length) {
setTimeout(() => {
restartWorker(workerIndex + 1);
}, 5000);
}
}
// Example: trigger a rolling restart after 15 seconds
setTimeout(() => {
console.log('Starting rolling restart of workers...');
restartWorker(0);
}, 15000);
// Additional event handlers
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} exited with code ${code}`);
});
} else {
// Worker process
http.createServer((req, res) => {
res.writeHead(200);
res.end(`Hello from worker ${process.pid}, started at ${new Date().toISOString()}\n`);
}).listen(8000);
console.log(`Worker ${process.pid} started`);
}
কর্মীর অবস্থা পর্যবেক্ষণ
কর্মীর অবস্থা পর্যবেক্ষণ এবং মেট্রিক্স সংগ্রহ:
const cluster = require('cluster');
const http = require('http');
const os = require('os');
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
// Fork workers
const workers = [];
for (let i = 0; i < os.cpus().length; i++) {
workers.push(cluster.fork());
}
// Store metrics for each worker
const workerMetrics = {};
// Set up metrics collection
for (const worker of workers) {
workerMetrics[worker.id] = {
id: worker.id,
pid: worker.process.pid,
requests: 0,
errors: 0,
lastActive: Date.now(),
memoryUsage: {}
};
// Handle messages from workers
worker.on('message', (msg) => {
if (msg.type === 'metrics') {
// Update metrics
workerMetrics[worker.id] = {
...workerMetrics[worker.id],
...msg.data,
lastActive: Date.now()
};
}
});
}
// Create an HTTP server for monitoring
http.createServer((req, res) => {
if (req.url === '/metrics') {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
workers: Object.values(workerMetrics),
system: {
loadAvg: os.loadavg(),
totalMem: os.totalmem(),
freeMem: os.freemem(),
uptime: os.uptime()
}
}, null, 2));
} else {
res.writeHead(404);
res.end('Not found');
}
}).listen(8001);
console.log('Primary: Monitoring server running on port 8001');
// Check for unresponsive workers
setInterval(() => {
const now = Date.now();
for (const worker of workers) {
const metrics = workerMetrics[worker.id];
// If worker hasn't reported in 30 seconds
if (now - metrics.lastActive > 80800) {
console.warn(`Worker ${worker.id} appears unresponsive, restarting...`);
// Kill the unresponsive worker
worker.kill();
// Fork a replacement
const newWorker = cluster.fork();
// Set up metrics for new worker
workerMetrics[newWorker.id] = {
id: newWorker.id,
pid: newWorker.process.pid,
requests: 0,
errors: 0,
lastActive: Date.now(),
memoryUsage: {}
};
// Replace in workers array
const index = workers.indexOf(worker);
if (index !== -1) {
workers[index] = newWorker;
}
// Clean up old metrics
delete workerMetrics[worker.id];
}
}
}, 10000);
} else {
// Worker process
console.log(`Worker ${process.pid} started`);
// Track metrics
let requestCount = 0;
let errorCount = 0;
// Report metrics to primary every 5 seconds
setInterval(() => {
process.send({
type: 'metrics',
data: {
requests: requestCount,
errors: errorCount,
memoryUsage: process.memoryUsage()
}
});
}, 5000);
// Create HTTP server
http.createServer((req, res) => {
requestCount++;
try {
res.writeHead(200);
res.end(`Hello from worker ${process.pid}\n`);
} catch (error) {
errorCount++;
console.error(`Worker ${process.pid} error:`, error);
}
}).listen(8000);
}
কর্মীর সর্বোত্তম অনুশীলন
1. স্তর বিচ্ছিন্নতা নিশ্চিত করুন
কর্মী প্রক্রিয়াগুলিকে রাষ্ট্রহীন রাখুন বা যথাযথ রাষ্ট্র পরিচালনা নিশ্চিত করুন:
// BAD - State shared across forked processes won't work as expected
let requestCount = 0;
// GOOD - Each worker has its own isolated state
if (cluster.isPrimary) {
// Primary logic
} else {
// Worker-specific state
let workerRequestCount = 0;
}
2. অপ্রত্যাশিত কর্মী ফলাফল হ্যান্ডেল
if (cluster.isPrimary) {
cluster.on('exit', (worker, code, signal) => {
if (code !== 0 && !worker.exitedAfterDisconnect) {
console.log(`Worker ${worker.id} crashed. Restarting...`);
cluster.fork();
}
});
}
3. ওয়ার্কার স্টিকি সেশন ব্যবহার করুন
const cluster = require('cluster');
const http = require('http');
if (cluster.isPrimary) {
// Setup sticky session logic
cluster.schedulingPolicy = cluster.SCHED_NONE;
// Start workers
const numCPUs = require('os').cpus().length;
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// Create routes based on connection's remote IP
cluster.on('connection', (connection, address) => {
// Calculate which worker gets connection based on IP
const worker = Object.values(cluster.workers)[
Number(address.toString().split(':')[3]) % Object.keys(cluster.workers).length
];
worker.send('sticky-session:connection', connection);
});
} else {
// Worker code
http.createServer((req, res) => {
res.end(`Handled by worker ${process.pid}`);
}).listen(8000, () => {
console.log(`Worker ${process.pid} listening`);
});
// Receive sticky connections
process.on('message', (message, connection) => {
if (message !== 'sticky-session:connection') return;
// Emulate a connection event on the server
server.emit('connection', connection);
connection.resume();
});
}