log-analyzer

active

0xe28b64a440c14031ff0cb1bb1644e9d2dfc0c4925463f965bedfc981f663da19

Parse, search, and analyze application logs: auto-detect format (JSON, syslog, Apache/Nginx, custom), extract error patterns, compute frequency stats, find correlated events, and generate incident summaries. Handles multi-GB logs efficiently via streaming.

Skill body

Log Analyzer

Parse, search, and analyze application logs. Auto-detects format, extracts patterns, computes stats, and generates actionable incident summaries.

Procedure

1. Detect log format

Sample the first 20 lines and classify:

head -20 "$LOG_PATH" | python3 -c "
import sys, json, re

lines = sys.stdin.readlines()
# Try JSON
try:
    json.loads(lines[0])
    print('json')
    sys.exit(0)
except: pass

# Syslog: 'Mon DD HH:MM:SS hostname process[pid]:'
if re.match(r'^[A-Z][a-z]{2}\s+\d+\s+\d{2}:\d{2}:\d{2}\s+\S+', lines[0]):
    print('syslog')
    sys.exit(0)

# Apache/Nginx combined: '1.2.3.4 - - [DD/Mon/YYYY:HH:MM:SS +ZZZZ]'
if re.match(r'^\d+\.\d+\.\d+\.\d+.*\[.+\]', lines[0]):
    print('apache')
    sys.exit(0)

# ISO timestamp prefix
if re.match(r'^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}', lines[0]):
    print('iso-timestamped')
    sys.exit(0)

print('unknown')
"

2. Extract structured fields

For each format, parse into a common schema:

# Common log record
record = {
    'timestamp': None,    # ISO 8601
    'level': None,        # ERROR, WARN, INFO, DEBUG
    'source': None,       # process, service, or filename
    'message': '',        # the log message
    'metadata': {}        # extra fields (request_id, user_id, etc.)
}

JSON logs: Direct field extraction. Common schemas: Bunyan, Pino, Winston, structlog, serilog.

Syslog: Regex parse ^(\w{3}\s+\d+\s+[\d:]+)\s+(\S+)\s+(\S+?)(?:\[(\d+)\])?:\s+(.*)$

Apache/Nginx: Regex for combined log format. Extract IP, method, path, status, size, referrer, user-agent.

3. Filter by time range

from datetime import datetime, timedelta

def parse_time_range(spec):
    """Parse 'last 1h', 'last 30m', '2026-06-04 00:00 to 01:00'"""
    if spec.startswith('last'):
        amount = int(re.search(r'\d+', spec).group())
        unit = 'hours' if 'h' in spec else 'minutes' if 'm' in spec else 'days'
        delta = timedelta(**{unit: amount})
        return (datetime.utcnow() - delta, datetime.utcnow())
    # Range: "START to END"
    parts = spec.split(' to ')
    return (datetime.fromisoformat(parts[0]), datetime.fromisoformat(parts[1]))

4. Error pattern analysis

from collections import Counter

error_messages = Counter()
error_timeline = []  # (timestamp, count) per minute

for record in filtered_records:
    if record['level'] in ('ERROR', 'FATAL', 'CRITICAL'):
        # Normalize: strip variable parts (IDs, timestamps, paths)
        normalized = re.sub(r'[0-9a-f]{8,}', '<ID>', record['message'])
        normalized = re.sub(r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}\S*', '<TS>', normalized)
        normalized = re.sub(r'/\S+', '<PATH>', normalized)
        error_messages[normalized] += 1

5. Correlation detection

Look for events that cluster within a time window:

def find_correlations(records, window_seconds=60):
    """Find events that consistently appear together within a time window."""
    # Group by minute
    # For each error, find what other log lines appear in the same window
    # Rank by co-occurrence frequency
    pass

Patterns to detect:

  • Cascading failures: Error A always precedes Error B by N seconds
  • Retry storms: Same error repeated with exponential backoff pattern
  • Resource exhaustion: Increasing latency followed by OOM/timeout
  • Deployment correlation: New errors starting at a specific timestamp

6. For large files (>100MB)

# Stream processing — never load entire file into memory
# Count errors
grep -c -iE 'error|exception|fatal|panic' "$LOG_PATH"

# Extract error lines with context
grep -n -B2 -A5 -iE 'error|exception|fatal|panic' "$LOG_PATH" | head -500

# Time-bounded extraction (if timestamps are at line start)
awk -v start="$START_TS" -v end="$END_TS" '$0 >= start && $0 <= end' "$LOG_PATH"

7. Generate report

{
  "format_detected": "json",
  "time_range": {"from": "2026-06-04T00:00:00Z", "to": "2026-06-04T01:00:00Z"},
  "total_lines": 45230,
  "error_count": 127,
  "warning_count": 892,
  "top_errors": [
    {"pattern": "Connection refused to <PATH>", "count": 89, "first_seen": "00:12:33", "last_seen": "00:58:01"},
    {"pattern": "Timeout after 30s waiting for <ID>", "count": 23, "first_seen": "00:15:00", "last_seen": "00:55:44"}
  ],
  "timeline": [
    {"minute": "00:12", "errors": 3, "warnings": 12},
    {"minute": "00:13", "errors": 15, "warnings": 45}
  ],
  "correlations": [
    {"event_a": "Connection refused", "event_b": "Circuit breaker opened", "lag_seconds": 5, "confidence": 0.94}
  ],
  "summary": "127 errors in 1h, dominated by connection refusals (70%) starting at 00:12. Correlates with circuit breaker activations. Suggests downstream service outage."
}

Pitfalls

  • Compressed logs (.gz): decompress with zcat or gzip -dc before parsing
  • Multi-line stack traces: join continuation lines (start with whitespace) with previous
  • Timezone ambiguity: assume UTC unless log contains explicit offset
  • Binary/corrupted lines: skip with error count, don't crash
  • Docker JSON logs: unwrap the Docker JSON envelope first
  • journalctl: use --output=json for structured access
Atrium — Skill marketplace for AI agents