Skip to content

NLP Encoder API Reference

LogEncoder

Encodes raw SIEM log strings (Windows Event XML / Sysmon / Metasploit stdout) into dense float32 vectors consumable by the PyTorch LSTM policy.

The encoder is stateless after init — encode() is pure and thread-safe. An LRU-style string cache avoids re-encoding identical log bursts.

Source code in netforge_rl\nlp\log_encoder.py
class LogEncoder:
    """
    Encodes raw SIEM log strings (Windows Event XML / Sysmon / Metasploit stdout)
    into dense float32 vectors consumable by the PyTorch LSTM policy.

    The encoder is stateless after __init__ — encode() is pure and thread-safe.
    An LRU-style string cache avoids re-encoding identical log bursts.
    """

    def __init__(
        self,
        backend: Literal['tfidf', 'transformer'] = 'tfidf',
        cache_size: int = 512,
    ) -> None:
        self.backend = backend
        self._cache: dict[str, np.ndarray] = {}
        self._cache_size = cache_size
        self._encoder = self._build_encoder(backend)

    def encode(self, text: str) -> np.ndarray:
        """
        Encode a single SIEM log string to a float32 vector of shape (EMBEDDING_DIM,).

        Returns a zero vector for empty/None inputs.
        """
        if not text or not text.strip():
            return np.zeros(EMBEDDING_DIM, dtype=np.float32)

        # Cache lookup (keyed by first 256 chars — avoids huge key strings)
        cache_key = hashlib.md5(text[:256].encode()).hexdigest()
        if cache_key in self._cache:
            return self._cache[cache_key]

        vec = self._encoder(text)
        self._evict_if_full()
        self._cache[cache_key] = vec
        return vec

    def encode_buffer(self, log_lines: list[str], agg: str = 'mean') -> np.ndarray:
        """
        Encode a list of log lines and aggregate them into a single vector.

        Args:
            log_lines: List of log strings (e.g. last N from siem_log_buffer).
            agg: Aggregation strategy — 'mean' (default) or 'max'.

        Returns:
            Aggregated float32 vector of shape (EMBEDDING_DIM,).
        """
        if not log_lines:
            return np.zeros(EMBEDDING_DIM, dtype=np.float32)

        # Normalise: convert legacy dict-format log entries to strings
        str_lines = [line if isinstance(line, str) else str(line) for line in log_lines]
        vecs = np.stack([self.encode(line) for line in str_lines])
        if agg == 'max':
            return vecs.max(axis=0).astype(np.float32)
        return vecs.mean(axis=0).astype(np.float32)

    def _build_encoder(self, backend: str):
        if backend == 'transformer':
            return self._build_transformer()
        return self._build_tfidf()

    def _build_tfidf(self):
        """
        Build a TF-IDF vectorizer fit on the payload library + event templates corpus.
        Projects to EMBEDDING_DIM via truncated SVD (Latent Semantic Analysis).
        """
        from sklearn.feature_extraction.text import TfidfVectorizer
        from sklearn.decomposition import TruncatedSVD
        from sklearn.pipeline import Pipeline
        from sklearn.preprocessing import Normalizer

        corpus = self._build_training_corpus()

        pipeline = Pipeline(
            [
                (
                    'tfidf',
                    TfidfVectorizer(
                        analyzer='char_wb',
                        ngram_range=(3, 5),
                        max_features=4096,
                        sublinear_tf=True,
                    ),
                ),
                ('svd', TruncatedSVD(n_components=EMBEDDING_DIM, random_state=42)),
                ('norm', Normalizer(norm='l2')),
            ]
        )
        pipeline.fit(corpus)
        logger.info(
            'LogEncoder[tfidf]: fitted on %d corpus documents → %d-dim LSA.',
            len(corpus),
            EMBEDDING_DIM,
        )

        def encode_fn(text: str) -> np.ndarray:
            vec = pipeline.transform([text])[0]
            return vec.astype(np.float32)

        return encode_fn

    def _build_transformer(self):
        """
        Build a sentence-transformers encoder (all-MiniLM-L6-v2, 22MB).
        Projects 384-dim → EMBEDDING_DIM via a fixed random projection matrix.
        """
        try:
            from sentence_transformers import SentenceTransformer  # type: ignore
            import torch

            model = SentenceTransformer('all-MiniLM-L6-v2')
            model.eval()

            # Fixed random projection: 384 → EMBEDDING_DIM
            rng = np.random.default_rng(42)
            proj = rng.standard_normal((384, EMBEDDING_DIM)).astype(np.float32)
            proj /= np.linalg.norm(proj, axis=0, keepdims=True) + 1e-8

            logger.info(
                'LogEncoder[transformer]: loaded all-MiniLM-L6-v2 → %d-dim projection.',
                EMBEDDING_DIM,
            )

            def encode_fn(text: str) -> np.ndarray:
                with torch.no_grad():
                    emb = model.encode(text, convert_to_numpy=True)
                vec = (emb @ proj).astype(np.float32)
                # L2 normalise
                norm = np.linalg.norm(vec)
                return vec / (norm + 1e-8) if norm > 0 else vec

            return encode_fn

        except ImportError:
            logger.warning(
                'LogEncoder: sentence-transformers not installed. '
                'Falling back to TF-IDF backend. '
                'Run: pip install sentence-transformers'
            )
            return self._build_tfidf()

    def _build_training_corpus(self) -> list[str]:
        """
        Assemble a training corpus from:
          1. payload_library.json (Metasploit stdout strings)
          2. Synthetic event template samples
        """
        corpus: list[str] = []

        # 1. Load payload library
        lib_path = Path(__file__).parent.parent / 'sim2real' / 'payload_library.json'
        if lib_path.exists():
            with open(lib_path) as f:
                lib = json.load(f)
            for action_data in lib.values():
                for outcome_list in action_data.values():
                    for text in outcome_list:
                        corpus.append(text)

        # 2. Synthetic template samples (generate 5 of each template type)
        from netforge_rl.siem.event_templates import (
            evid_4624,
            evid_4625,
            evid_4648,
            evid_4688,
            evid_4768,
            evid_4776,
            sysmon_1,
            sysmon_3,
            sysmon_10,
            sysmon_22,
        )

        sample_ips = ['10.0.0.1', '10.0.1.2', '192.168.1.5', '10.0.0.7', '10.0.1.9']
        for src, tgt in zip(sample_ips, reversed(sample_ips)):
            for fn in [evid_4624, evid_4625, evid_4648, evid_4776]:
                corpus.append(fn(src, tgt))
            corpus.append(evid_4688(src, process='mimikatz.exe'))
            corpus.append(evid_4688(src, process='powershell.exe'))
            corpus.append(evid_4768(src, tgt))
            corpus.append(sysmon_1(src, process='powershell.exe'))
            corpus.append(sysmon_3(src, tgt, dst_port=445))
            corpus.append(sysmon_10(src))
            corpus.append(sysmon_22(src))

        if not corpus:
            # Ultimate fallback — at least something to fit on
            corpus = [
                'Windows Event Log',
                'Sysmon Network Connection',
                'LSASS access detected',
            ]

        return corpus

    def _evict_if_full(self) -> None:
        if len(self._cache) >= self._cache_size:
            # Evict oldest quarter of entries (FIFO approximation)
            evict_n = self._cache_size // 4
            keys = list(self._cache.keys())[:evict_n]
            for k in keys:
                del self._cache[k]

encode

encode(text: str) -> np.ndarray

Encode a single SIEM log string to a float32 vector of shape (EMBEDDING_DIM,).

Returns a zero vector for empty/None inputs.

Source code in netforge_rl\nlp\log_encoder.py
def encode(self, text: str) -> np.ndarray:
    """
    Encode a single SIEM log string to a float32 vector of shape (EMBEDDING_DIM,).

    Returns a zero vector for empty/None inputs.
    """
    if not text or not text.strip():
        return np.zeros(EMBEDDING_DIM, dtype=np.float32)

    # Cache lookup (keyed by first 256 chars — avoids huge key strings)
    cache_key = hashlib.md5(text[:256].encode()).hexdigest()
    if cache_key in self._cache:
        return self._cache[cache_key]

    vec = self._encoder(text)
    self._evict_if_full()
    self._cache[cache_key] = vec
    return vec

encode_buffer

encode_buffer(
    log_lines: list[str], agg: str = 'mean'
) -> np.ndarray

Encode a list of log lines and aggregate them into a single vector.

Parameters:

Name Type Description Default
log_lines list[str]

List of log strings (e.g. last N from siem_log_buffer).

required
agg str

Aggregation strategy — 'mean' (default) or 'max'.

'mean'

Returns:

Type Description
ndarray

Aggregated float32 vector of shape (EMBEDDING_DIM,).

Source code in netforge_rl\nlp\log_encoder.py
def encode_buffer(self, log_lines: list[str], agg: str = 'mean') -> np.ndarray:
    """
    Encode a list of log lines and aggregate them into a single vector.

    Args:
        log_lines: List of log strings (e.g. last N from siem_log_buffer).
        agg: Aggregation strategy — 'mean' (default) or 'max'.

    Returns:
        Aggregated float32 vector of shape (EMBEDDING_DIM,).
    """
    if not log_lines:
        return np.zeros(EMBEDDING_DIM, dtype=np.float32)

    # Normalise: convert legacy dict-format log entries to strings
    str_lines = [line if isinstance(line, str) else str(line) for line in log_lines]
    vecs = np.stack([self.encode(line) for line in str_lines])
    if agg == 'max':
        return vecs.max(axis=0).astype(np.float32)
    return vecs.mean(axis=0).astype(np.float32)