2.6 System Logging
Understanding System Logs
Linux logging is the foundation of observability in DevOps environments. Modern systems combine systemd journald with traditional log files, container logs, and centralized logging solutions.
Critical Log Categories:
# System logs
/var/log/messages # General system messages (RHEL/CentOS)
/var/log/syslog # System logs (Debian/Ubuntu)
/var/log/kern.log # Kernel messages
/var/log/dmesg # Boot messages
# Security logs
/var/log/secure # Authentication logs (RHEL/CentOS)
/var/log/auth.log # Authentication logs (Debian/Ubuntu)
/var/log/faillog # Failed login attempts
/var/log/lastlog # Last login information
# Service logs
/var/log/cron # Cron job execution
/var/log/mail.log # Mail server logs
/var/log/apache2/ # Apache web server logs
/var/log/nginx/ # Nginx web server logs
# Application logs
/var/log/myapp/ # Custom application logs
/opt/app/logs/ # Application-specific directories
DevOps Logging Context:
Modern logging involves multiple layers:
System Logs: Traditional syslog and journald
Container Logs: Docker and Kubernetes container output
Application Logs: Structured logging (JSON, structured text)
Centralized Logging: ELK Stack, Splunk, CloudWatch
Metrics Integration: Prometheus, Grafana, DataDog
journalctl (systemd logs)
Advanced journalctl Usage:
# Basic log viewing
journalctl # All logs (paginated)
journalctl --no-pager # All logs without pager
journalctl -n 50 # Last 50 entries
journalctl -f # Follow logs in real-time
# Service-specific logs
journalctl -u nginx # Nginx service logs
journalctl -u docker # Docker daemon logs
journalctl -u kubelet # Kubernetes kubelet logs
journalctl -u ssh.service # SSH service logs
# Time-based filtering
journalctl --since today # Today's logs
journalctl --since yesterday # Yesterday's logs
journalctl --since "2025-01-01" # From specific date
journalctl --since "1 hour ago" # Last hour
journalctl --since "30 min ago" # Last 30 minutes
journalctl --until "2025-01-01 12:00" # Until specific time
# Priority-based filtering
journalctl -p emerg # Emergency messages
journalctl -p alert # Alert messages
journalctl -p crit # Critical messages
journalctl -p err # Error messages
journalctl -p warning # Warning and above
journalctl -p notice # Notice and above
journalctl -p info # Info and above
journalctl -p debug # All messages
# Boot-specific logs
journalctl --list-boots # List all boots
journalctl -b # Current boot
journalctl -b -1 # Previous boot
journalctl -b -2 # Two boots ago
# Kernel and hardware logs
journalctl -k # Kernel messages
journalctl -k -b # Kernel messages from current boot
# Advanced filtering
journalctl _PID=1234 # Logs from specific PID
journalctl _UID=1000 # Logs from specific user
journalctl _COMM=nginx # Logs from specific command
journalctl SYSLOG_FACILITY=10 # Logs from specific facility
# Output formatting
journalctl -o json # JSON format
journalctl -o json-pretty # Pretty JSON format
journalctl -o cat # Only message content
journalctl -o short-iso # ISO timestamp format
# Disk usage and maintenance
journalctl --disk-usage # Show journal disk usage
journalctl --vacuum-size=100M # Keep only 100MB of logs
journalctl --vacuum-time=2weeks # Keep only 2 weeks of logs
journalctl --rotate # Force log rotation
Traditional Log Tools
Advanced Log File Analysis:
# Real-time log monitoring
tail -f /var/log/syslog # Follow single log
tail -f /var/log/{syslog,auth.log} # Follow multiple logs
multitail /var/log/syslog /var/log/auth.log # Side-by-side monitoring
# Log content analysis
less /var/log/syslog # Paginated viewing
zless /var/log/syslog.1.gz # View compressed logs
head -n 100 /var/log/messages # First 100 lines
tail -n 500 /var/log/auth.log # Last 500 lines
# Searching and filtering
grep -i "error" /var/log/syslog # Case-insensitive search
grep -n "failed" /var/log/auth.log # Show line numbers
grep -A 5 -B 5 "error" /var/log/syslog # Show context lines
grep -r "pattern" /var/log/ # Recursive search
zgrep "pattern" /var/log/*.gz # Search compressed logs
# Modern search tools
rg "error" /var/log/ # ripgrep (faster than grep)
ag "pattern" /var/log/ # silver searcher
# Log statistics and analysis
awk '{print $1}' /var/log/access.log | sort | uniq -c # Count by first field
cut -d' ' -f1 /var/log/auth.log | sort | uniq -c # Extract and count IPs
# Date-based log analysis
sed -n '/Jan 10/,/Jan 11/p' /var/log/syslog # Extract date range
awk '/2025-01-10/' /var/log/syslog # Filter by date pattern
Log Rotation Management:
# Log rotation configuration
cat /etc/logrotate.conf # Main configuration
ls /etc/logrotate.d/ # Service-specific configs
# Manual log rotation
logrotate -d /etc/logrotate.conf # Dry run (debug mode)
logrotate -f /etc/logrotate.conf # Force rotation
logrotate -v /etc/logrotate.conf # Verbose output
# Check rotation status
cat /var/lib/logrotate/status # Last rotation times
ls -la /var/log/ | grep -E "\.(gz|bz2)$" # Rotated log files
# Custom log rotation example
cat > /etc/logrotate.d/myapp << 'EOF'
/var/log/myapp/*.log {
daily
missingok
rotate 30
compress
delaycompress
notifempty
create 644 myapp myapp
postrotate
systemctl reload myapp
endscript
}
EOF
Container and Cloud Logging:
# Docker container logs
docker logs container_name # View container logs
docker logs -f container_name # Follow container logs
docker logs --since 1h container_name # Last hour of logs
docker logs --tail 100 container_name # Last 100 lines
# Kubernetes pod logs
kubectl logs pod_name # Pod logs
kubectl logs -f pod_name # Follow pod logs
kubectl logs pod_name -c container # Specific container in pod
kubectl logs --previous pod_name # Previous container instance
# Cloud logging (AWS CloudWatch example)
aws logs describe-log-groups # List log groups
aws logs tail /aws/lambda/function # Tail CloudWatch logs
# Centralized logging setup
# Filebeat configuration for ELK stack
cat > /etc/filebeat/filebeat.yml << 'EOF'
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/*.log
- /var/log/myapp/*.log
output.elasticsearch:
hosts: ["elasticsearch:9200"]
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
EOF
Python Log Analysis
Production-Ready Log Analysis Framework:
#!/usr/bin/env python3
# advanced_log_analyzer.py - DevOps log analysis
import re
import json
import subprocess
import gzip
import logging
from datetime import datetime, timedelta
from collections import defaultdict, Counter
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import ipaddress
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LogAnalyzer:
"""Advanced log analysis for DevOps environments"""
def __init__(self):
self.patterns = {
# SSH patterns
'ssh_failed': r'Failed password for (?:invalid user )?(\w+) from (\d+\.\d+\.\d+\.\d+)',
'ssh_success': r'Accepted (?:password|publickey) for (\w+) from (\d+\.\d+\.\d+\.\d+)',
'ssh_disconnect': r'Disconnected from (\d+\.\d+\.\d+\.\d+)',
# Web server patterns
'nginx_error': r'(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] .* client: (\d+\.\d+\.\d+\.\d+)',
'apache_error': r'\[([^\]]+)\] \[(\w+)\] .* client (\d+\.\d+\.\d+\.\d+)',
'http_404': r'(\d+\.\d+\.\d+\.\d+) .* "(?:GET|POST|PUT|DELETE) ([^"]*)" 404',
# System patterns
'oom_killer': r'Out of memory: Kill process (\d+) \(([^)]+)\)',
'segfault': r'segfault at ([0-9a-f]+) .* in ([^\[]+)',
'kernel_error': r'kernel: \[([\d.]+)\] (.+)',
# Security patterns
'sudo_usage': r'(\w+) : TTY=([^;]*) ; PWD=([^;]*) ; USER=([^;]*) ; COMMAND=(.+)',
'failed_su': r'FAILED SU \(to (\w+)\) (\w+) on (\w+)',
# General patterns
'timestamp': r'(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})',
'ip_address': r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
'error_levels': r'(ERROR|WARN|INFO|DEBUG|FATAL|CRITICAL)',
}
def analyze_security_logs(self, log_files: List[str]) -> Dict:
"""Comprehensive security log analysis"""
results = {
'failed_logins': defaultdict(int),
'successful_logins': [],
'suspicious_ips': set(),
'brute_force_attempts': {},
'privilege_escalations': [],
'geographic_analysis': defaultdict(int)
}
for log_file in log_files:
try:
self._process_security_log_file(log_file, results)
except Exception as e:
logger.error(f"Error processing {log_file}: {e}")
# Analyze brute force patterns
results['brute_force_attempts'] = self._detect_brute_force(results['failed_logins'])
# Identify suspicious IPs
results['suspicious_ips'] = self._identify_suspicious_ips(results['failed_logins'])
return results
def _process_security_log_file(self, log_file: str, results: Dict):
"""Process individual security log file"""
open_func = gzip.open if log_file.endswith('.gz') else open
mode = 'rt' if log_file.endswith('.gz') else 'r'
with open_func(log_file, mode, errors='ignore') as f:
for line_num, line in enumerate(f, 1):
try:
# Failed SSH attempts
match = re.search(self.patterns['ssh_failed'], line)
if match:
user, ip = match.groups()
results['failed_logins'][ip] += 1
if self._is_suspicious_login_attempt(user, ip):
results['suspicious_ips'].add(ip)
# Successful SSH logins
match = re.search(self.patterns['ssh_success'], line)
if match:
user, ip = match.groups()
timestamp = self._extract_timestamp(line)
results['successful_logins'].append({
'user': user,
'ip': ip,
'timestamp': timestamp,
'line_number': line_num
})
# Sudo usage
match = re.search(self.patterns['sudo_usage'], line)
if match:
user, tty, pwd, target_user, command = match.groups()
results['privilege_escalations'].append({
'user': user,
'target_user': target_user,
'command': command,
'timestamp': self._extract_timestamp(line)
})
except Exception as e:
logger.warning(f"Error processing line {line_num} in {log_file}: {e}")
def analyze_application_logs(self, log_dir: str, app_name: str) -> Dict:
"""Analyze application-specific logs"""
log_path = Path(log_dir)
results = {
'error_count': 0,
'warning_count': 0,
'error_patterns': Counter(),
'performance_issues': [],
'timeline': defaultdict(int)
}
# Find application log files
log_files = list(log_path.glob(f"{app_name}*.log*"))
for log_file in log_files:
try:
self._analyze_app_log_file(str(log_file), results)
except Exception as e:
logger.error(f"Error analyzing {log_file}: {e}")
return results
def _analyze_app_log_file(self, log_file: str, results: Dict):
"""Analyze individual application log file"""
open_func = gzip.open if log_file.endswith('.gz') else open
mode = 'rt' if log_file.endswith('.gz') else 'r'
with open_func(log_file, mode, errors='ignore') as f:
for line in f:
# Count error levels
if re.search(r'ERROR|error', line, re.IGNORECASE):
results['error_count'] += 1
# Extract error pattern
error_msg = self._extract_error_message(line)
if error_msg:
results['error_patterns'][error_msg] += 1
elif re.search(r'WARN|warning', line, re.IGNORECASE):
results['warning_count'] += 1
# Performance issues
if self._is_performance_issue(line):
results['performance_issues'].append({
'line': line.strip(),
'timestamp': self._extract_timestamp(line)
})
# Timeline analysis
timestamp = self._extract_timestamp(line)
if timestamp:
hour = timestamp.split(':')[0] if ':' in timestamp else timestamp
results['timeline'][hour] += 1
def generate_security_report(self, analysis_results: Dict) -> str:
"""Generate comprehensive security report"""
report = []
report.append("SECURITY ANALYSIS REPORT")
report.append("=" * 50)
report.append(f"Generated: {datetime.now().isoformat()}\n")
# Failed login summary
total_failures = sum(analysis_results['failed_logins'].values())
report.append(f"Total Failed Login Attempts: {total_failures}")
report.append(f"Unique IPs with Failed Attempts: {len(analysis_results['failed_logins'])}")
# Top attacking IPs
if analysis_results['failed_logins']:
report.append("\nTop 10 Attacking IPs:")
sorted_ips = sorted(analysis_results['failed_logins'].items(),
key=lambda x: x[1], reverse=True)[:10]
for ip, count in sorted_ips:
report.append(f" {ip}: {count} attempts")
# Brute force detection
if analysis_results['brute_force_attempts']:
report.append("\nBrute Force Attacks Detected:")
for ip, data in analysis_results['brute_force_attempts'].items():
report.append(f" {ip}: {data['attempts']} attempts in {data['timespan']}")
# Successful logins
report.append(f"\nSuccessful Logins: {len(analysis_results['successful_logins'])}")
# Privilege escalations
report.append(f"Sudo Commands Executed: {len(analysis_results['privilege_escalations'])}")
return '\n'.join(report)
def export_to_json(self, analysis_results: Dict, output_file: str):
"""Export analysis results to JSON"""
# Convert sets to lists for JSON serialization
json_results = {}
for key, value in analysis_results.items():
if isinstance(value, set):
json_results[key] = list(value)
elif isinstance(value, defaultdict):
json_results[key] = dict(value)
else:
json_results[key] = value
with open(output_file, 'w') as f:
json.dump(json_results, f, indent=2, default=str)
def _detect_brute_force(self, failed_logins: Dict, threshold: int = 20) -> Dict:
"""Detect brute force attacks"""
brute_force = {}
for ip, count in failed_logins.items():
if count >= threshold:
brute_force[ip] = {
'attempts': count,
'timespan': 'unknown', # Would need timestamp analysis
'severity': 'high' if count > 100 else 'medium'
}
return brute_force
def _identify_suspicious_ips(self, failed_logins: Dict) -> set:
"""Identify suspicious IP addresses"""
suspicious = set()
for ip, count in failed_logins.items():
try:
ip_obj = ipaddress.ip_address(ip)
# Flag high-frequency attempts
if count > 50:
suspicious.add(ip)
# Flag private IPs attempting external access
if ip_obj.is_private and count > 10:
suspicious.add(ip)
except ValueError:
continue
return suspicious
def _is_suspicious_login_attempt(self, user: str, ip: str) -> bool:
"""Determine if login attempt is suspicious"""
suspicious_users = ['root', 'admin', 'administrator', 'test', 'oracle']
return user.lower() in suspicious_users
def _extract_timestamp(self, line: str) -> Optional[str]:
"""Extract timestamp from log line"""
match = re.search(self.patterns['timestamp'], line)
return match.group(1) if match else None
def _extract_error_message(self, line: str) -> Optional[str]:
"""Extract error message pattern"""
# Simple extraction - could be enhanced based on log format
parts = line.split()
for i, part in enumerate(parts):
if 'error' in part.lower():
return ' '.join(parts[i:i+5]) # Get context around error
return None
def _is_performance_issue(self, line: str) -> bool:
"""Detect performance-related issues"""
performance_keywords = [
'timeout', 'slow', 'performance', 'latency',
'memory', 'cpu', 'disk', 'connection pool'
]
return any(keyword in line.lower() for keyword in performance_keywords)
def main():
"""Example usage"""
analyzer = LogAnalyzer()
# Analyze security logs
security_logs = ['/var/log/auth.log', '/var/log/secure']
existing_logs = [log for log in security_logs if Path(log).exists()]
if existing_logs:
results = analyzer.analyze_security_logs(existing_logs)
report = analyzer.generate_security_report(results)
print(report)
# Export to JSON
analyzer.export_to_json(results, 'security_analysis.json')
else:
print("No security log files found")
if __name__ == "__main__":
main()
import time
import re
def monitor_failed_logins():
"""Monitor for failed SSH login attempts"""
# Start following auth log
proc = subprocess.Popen([
'journalctl', '-u', 'ssh', '-f', '--no-pager'
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
failed_pattern = re.compile(r'Failed password.*from (\d+\.\d+\.\d+\.\d+)')
print("Monitoring failed SSH attempts...")
try:
for line in iter(proc.stdout.readline, ''):
if failed_pattern.search(line):
ip_match = failed_pattern.search(line)
if ip_match:
ip = ip_match.group(1)
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
print(f"[{timestamp}] FAILED LOGIN from {ip}")
except KeyboardInterrupt:
proc.terminate()
Log Rotation Management:
# log_rotation.py
import os
import gzip
import shutil
from datetime import datetime
def rotate_custom_log(log_file, max_size_mb=100, keep_rotations=5):
"""Simple log rotation for custom applications"""
if not os.path.exists(log_file):
return
# Check file size
size_mb = os.path.getsize(log_file) / (1024 * 1024)
if size_mb > max_size_mb:
# Create rotated filename
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
rotated_file = f"{log_file}.{timestamp}.gz"
# Compress and rotate
with open(log_file, 'rb') as f_in:
with gzip.open(rotated_file, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# Clear original log
open(log_file, 'w').close()
print(f"Rotated {log_file} to {rotated_file}")
# Clean up old rotations
cleanup_old_logs(log_file, keep_rotations)
Log Monitoring Best Practices
Essential Monitoring:
# Monitor disk usage (logs can fill disk)
df -h /var/log
# Find large log files
find /var/log -type f -size +100M -exec ls -lh {} \;
# Monitor log growth
watch -n 5 'ls -lah /var/log/syslog'
Security Monitoring:
# Monitor failed login attempts
journalctl -u ssh --since today | grep "Failed password"
# Check for privilege escalation
journalctl --since today | grep -i sudo
# Monitor file system events
journalctl -k --since today | grep -i "filesystem\|mount"
Note
DevOps Tip: Set up centralized logging with tools like ELK stack (Elasticsearch, Logstash, Kibana) or use cloud logging services for production environments.
Cheat sheet
# print last 100 lines
tail -n 100 /var/log/messages
# follow log
tail -f /var/log/secure
# every journal entry that is in the system will be displayed
journalctl
# journal entries collected since the most recent reboot
journalctl -b
# display only kernel messages
journalctl -k
# display the last 20 messages
journalctl -n 20
# actively follow the logs as they are being written
journalctl -f
# filter messages by priority
journalctl -p err
# filter messages by the service unit
journalctl -u sshd
journalctl -u crond --since today
# show listing of last logged-in users
last
# show a listing of users logged in since the last boot
lastb
Warning
The only problem with troubleshooting is that sometimes trouble shoots back.
Questions
What is the name of the daemon that controls log files in RHEL?
rsyslogdsyslogdlogdlogrotate
Which of the following log files contains information about emails relayed by the local mail server?
/var/log/messages/var/log/secure/var/log/cron/var/log/maillog
Which of the following commands can be used to display the last 20 messages from the journal?
journalctl -n 20journalctl -fjournalctl -p errjournalctl -u sshd
Which of the following commands can be used to display the last 20 lines of the
/var/log/messagesfile?tail -n 20 /var/log/messagestail -f /var/log/messagestail -n 20 /var/log/messagestail -n 20 /var/log/messages
Answers
a
d
a
c