log-analyzer
active0xe28b64a440c14031ff0cb1bb1644e9d2dfc0c4925463f965bedfc981f663da19
Parse, search, and analyze application logs: auto-detect format (JSON, syslog, Apache/Nginx, custom), extract error patterns, compute frequency stats, find correlated events, and generate incident summaries. Handles multi-GB logs efficiently via streaming.
Skill body
Log Analyzer
Parse, search, and analyze application logs. Auto-detects format, extracts patterns, computes stats, and generates actionable incident summaries.
Procedure
1. Detect log format
Sample the first 20 lines and classify:
head -20 "$LOG_PATH" | python3 -c "
import sys, json, re
lines = sys.stdin.readlines()
# Try JSON
try:
json.loads(lines[0])
print('json')
sys.exit(0)
except: pass
# Syslog: 'Mon DD HH:MM:SS hostname process[pid]:'
if re.match(r'^[A-Z][a-z]{2}\s+\d+\s+\d{2}:\d{2}:\d{2}\s+\S+', lines[0]):
print('syslog')
sys.exit(0)
# Apache/Nginx combined: '1.2.3.4 - - [DD/Mon/YYYY:HH:MM:SS +ZZZZ]'
if re.match(r'^\d+\.\d+\.\d+\.\d+.*\[.+\]', lines[0]):
print('apache')
sys.exit(0)
# ISO timestamp prefix
if re.match(r'^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}', lines[0]):
print('iso-timestamped')
sys.exit(0)
print('unknown')
"
2. Extract structured fields
For each format, parse into a common schema:
# Common log record
record = {
'timestamp': None, # ISO 8601
'level': None, # ERROR, WARN, INFO, DEBUG
'source': None, # process, service, or filename
'message': '', # the log message
'metadata': {} # extra fields (request_id, user_id, etc.)
}
JSON logs: Direct field extraction. Common schemas: Bunyan, Pino, Winston, structlog, serilog.
Syslog: Regex parse ^(\w{3}\s+\d+\s+[\d:]+)\s+(\S+)\s+(\S+?)(?:\[(\d+)\])?:\s+(.*)$
Apache/Nginx: Regex for combined log format. Extract IP, method, path, status, size, referrer, user-agent.
3. Filter by time range
from datetime import datetime, timedelta
def parse_time_range(spec):
"""Parse 'last 1h', 'last 30m', '2026-06-04 00:00 to 01:00'"""
if spec.startswith('last'):
amount = int(re.search(r'\d+', spec).group())
unit = 'hours' if 'h' in spec else 'minutes' if 'm' in spec else 'days'
delta = timedelta(**{unit: amount})
return (datetime.utcnow() - delta, datetime.utcnow())
# Range: "START to END"
parts = spec.split(' to ')
return (datetime.fromisoformat(parts[0]), datetime.fromisoformat(parts[1]))
4. Error pattern analysis
from collections import Counter
error_messages = Counter()
error_timeline = [] # (timestamp, count) per minute
for record in filtered_records:
if record['level'] in ('ERROR', 'FATAL', 'CRITICAL'):
# Normalize: strip variable parts (IDs, timestamps, paths)
normalized = re.sub(r'[0-9a-f]{8,}', '<ID>', record['message'])
normalized = re.sub(r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}\S*', '<TS>', normalized)
normalized = re.sub(r'/\S+', '<PATH>', normalized)
error_messages[normalized] += 1
5. Correlation detection
Look for events that cluster within a time window:
def find_correlations(records, window_seconds=60):
"""Find events that consistently appear together within a time window."""
# Group by minute
# For each error, find what other log lines appear in the same window
# Rank by co-occurrence frequency
pass
Patterns to detect:
- Cascading failures: Error A always precedes Error B by N seconds
- Retry storms: Same error repeated with exponential backoff pattern
- Resource exhaustion: Increasing latency followed by OOM/timeout
- Deployment correlation: New errors starting at a specific timestamp
6. For large files (>100MB)
# Stream processing — never load entire file into memory
# Count errors
grep -c -iE 'error|exception|fatal|panic' "$LOG_PATH"
# Extract error lines with context
grep -n -B2 -A5 -iE 'error|exception|fatal|panic' "$LOG_PATH" | head -500
# Time-bounded extraction (if timestamps are at line start)
awk -v start="$START_TS" -v end="$END_TS" '$0 >= start && $0 <= end' "$LOG_PATH"
7. Generate report
{
"format_detected": "json",
"time_range": {"from": "2026-06-04T00:00:00Z", "to": "2026-06-04T01:00:00Z"},
"total_lines": 45230,
"error_count": 127,
"warning_count": 892,
"top_errors": [
{"pattern": "Connection refused to <PATH>", "count": 89, "first_seen": "00:12:33", "last_seen": "00:58:01"},
{"pattern": "Timeout after 30s waiting for <ID>", "count": 23, "first_seen": "00:15:00", "last_seen": "00:55:44"}
],
"timeline": [
{"minute": "00:12", "errors": 3, "warnings": 12},
{"minute": "00:13", "errors": 15, "warnings": 45}
],
"correlations": [
{"event_a": "Connection refused", "event_b": "Circuit breaker opened", "lag_seconds": 5, "confidence": 0.94}
],
"summary": "127 errors in 1h, dominated by connection refusals (70%) starting at 00:12. Correlates with circuit breaker activations. Suggests downstream service outage."
}
Pitfalls
- Compressed logs (.gz): decompress with
zcatorgzip -dcbefore parsing - Multi-line stack traces: join continuation lines (start with whitespace) with previous
- Timezone ambiguity: assume UTC unless log contains explicit offset
- Binary/corrupted lines: skip with error count, don't crash
- Docker JSON logs: unwrap the Docker JSON envelope first
- journalctl: use
--output=jsonfor structured access