CTEs, Joins, and Subsearches
When a single pipeline is not enough, LynxDB supports let-bound Common Table Expressions (CTEs), the join stage, and union for combining data from multiple sources or running multi-step analysis.
Common Table Expressions (CTEs)
CTEs let you define named result sets and reference them later in the query. This is the most powerful way to build complex multi-source queries.
CTE syntax
Define a CTE with let $name = <pipeline>; and reference it with from $name:
let $threats = from idx_backend | where threat_type in ["sqli", "path_traversal"] | keep client_ip, threat_type;
let $logins = from idx_audit | where type == "USER_LOGIN" and res == "failed" | stats count() as failures by src_ip | rename src_ip as client_ip;
from $threats | join type=inner on client_ip with $logins | keep client_ip, threat_type, failures
CTE rules
- CTEs are introduced with the
letkeyword (SPL2's bare$x = ...;form is gone). - CTE names start with
$and are assigned with=. - Each CTE ends with a semicolon
;. - CTEs are evaluated in order, from top to bottom, and can reference earlier CTEs.
- The final query (after all CTE definitions) produces the result.
letexists only at query level — there is no| letstage (useextendfor per-row bindings).
Example: security correlation
Find IPs that triggered threat detection AND had failed logins:
lynxdb query '
let $threats = from main
| where threat_type in ["sqli", "path_traversal"]
| keep client_ip, threat_type;
let $failed_logins = from main
| where type == "USER_LOGIN" and res == "failed"
| stats count() as failures by src_ip
| rename src_ip as client_ip;
from $threats
| join type=inner on client_ip with $failed_logins
| where failures > 5
| keep client_ip, threat_type, failures
| sort -failures'
Note the rename src_ip as client_ip — join keys must carry the same name on both sides.
Example: compare two time periods
Bracket time ranges on from make period comparisons direct:
lynxdb query '
let $current = from main[-1h]
| where level == "error"
| stats count() as current_errors by source;
let $previous = from main[-2h..-1h]
| where level == "error"
| stats count() as previous_errors by source;
from $current
| join type=outer on source with $previous
| extend change_pct = round((current_errors - previous_errors) / previous_errors * 100, 1)
| keep source, current_errors, previous_errors, change_pct
| sort -change_pct'
join
The join stage combines events from two datasets based on shared fields.
Inner join
Keep only events that have a match in both datasets:
lynxdb query 'from nginx
| join type=inner on client_ip with [
from main | where source == "auth" and type == "login"
| keep client_ip, user_id
]
| keep client_ip, user_id, uri, status'
Left join
Keep all events from the left side, with null values for unmatched right-side fields (this is what SPL2's type=outer did):
lynxdb query 'from nginx
| join type=left on client_ip with [
from main | where source == "geo"
| keep client_ip, country, city
]
| keep client_ip, uri, country, city'
Join syntax
| join [type=inner|left|outer|semi|anti|asof] on <field>[, <field>] [tolerance=<duration>] with ($cte | [<pipeline>])
| Parameter | Description |
|---|---|
type | inner (only matches, the default — a plain inner join, never innerunique), left (keep all from primary), outer (full outer join), semi (keep matching primary rows only), anti (keep non-matching primary rows only), or asof (latest secondary row at or before the primary _time) |
on | The field(s) to join on (must exist in both datasets) |
tolerance | Optional maximum lag for type=asof, such as 30s or 5m |
with | The secondary dataset: a $cte reference or a [<pipeline>] in square brackets |
Join on multiple fields
When you need to join on multiple fields, list them separated by commas:
lynxdb query 'from nginx
| join type=inner on host, timestamp with [
from metrics
| keep host, timestamp, cpu_pct, mem_pct
]
| keep host, timestamp, uri, cpu_pct, mem_pct'
union
The union stage appends rows from one or more sub-pipelines to the current result set. It replaces both APPEND and MULTISEARCH from SPL2. Schemas merge by name, with null-padding for columns that only exist on one side:
lynxdb query 'from nginx status>=500 | stats count() as errors by uri
| union [
from nginx | stats count() as total by uri
]'
Use case: combine different aggregations
When you need several aggregations that cannot be combined in a single stats:
lynxdb query 'from main | stats count() as total_events
| union [
from main | where level == "error" | stats count() as total_errors
]
| union [
from nginx | where status >= 500 | stats count() as nginx_5xx
]'
Use case: run several independent analyses (formerly MULTISEARCH)
Tag each branch with extend so the merged rows stay distinguishable:
lynxdb query 'from nginx status>=500 | stats count() as errors, avg(duration_ms) as avg_lat | extend service = "nginx"
| union [from `api-gateway` level=error | stats count() as errors, avg(duration_ms) as avg_lat | extend service = "api-gw"]
| union [from postgres duration_ms>1000 | stats count() as errors, avg(duration_ms) as avg_lat | extend service = "postgres"]
| keep service, errors, avg_lat
| sort -errors'
A single union also accepts multiple sub-pipelines separated by commas: union [<pipeline>], [<pipeline>].
transaction
The transaction stage groups events into transactions (sequences of related events) keyed by shared field values:
lynxdb query 'from `api-gateway`
| transaction session_id startswith=has(_raw, "request started") endswith=has(_raw, "request completed")
| keep session_id, duration, eventcount'
Each transaction row carries the grouping key plus a duration and an eventcount field. maxspan=<duration> caps how long a transaction may run.
Transactions are useful for:
- Grouping request start/end events into sessions
- Computing end-to-end latency across multiple log lines
- Finding incomplete transactions (missing end event)
For time-gap-based session grouping, see the sessionize helper:
lynxdb query 'from main | sessionize maxpause=30m by user_id'
Practical patterns
Find users hitting rate limits AND generating errors
lynxdb query '
let $rate_limited = from main
| where source == "api-gateway" and status == 429
| stats count() as rate_limit_hits by user_id;
let $errors = from main
| where source == "api-gateway" and status >= 500
| stats count() as error_count by user_id;
from $rate_limited
| join type=inner on user_id with $errors
| where rate_limit_hits > 10 and error_count > 5
| keep user_id, rate_limit_hits, error_count
| sort -rate_limit_hits'
Enrich nginx logs with geo data
lynxdb query 'from nginx[-1h] status>=500
| join type=left on client_ip with [
from main | where source == "geoip"
| dedup client_ip
| keep client_ip, country, city
]
| stats count() as count by country, city
| sort -count
| head 20'
Compare error rates across services
lynxdb query 'from main | where source == "nginx" | stats count() as total, count(where status >= 500) as errors | extend service = "nginx"
| union [from main | where source == "api-gateway" | stats count() as total, count(where level == "error") as errors | extend service = "api-gw"]
| union [from main | where source == "postgres" | stats count() as total, count(where level == "error") as errors | extend service = "postgres"]
| extend error_rate = round(errors / total * 100, 2)
| keep service, total, errors, error_rate
| sort -error_rate'
Performance considerations
- join: The right side (
with [...]or$cte) is loaded into memory for the hash join. Keep it small by filtering and aggregating before joining. Avoid joining two large unfiltered datasets. - CTEs: Each CTE is evaluated independently. Use filters and aggregations in CTEs to reduce intermediate result sizes.
- union: Rows stream through; the cost is the sum of the sub-pipelines. Keep each branch as narrow as possible (
keeponly the columns you need).
Next steps
- Search and filter logs -- write effective filters for sub-pipelines
- Run aggregations -- build aggregations for CTE pipelines
- join reference -- full join syntax and options
- union reference -- full union syntax
- transaction reference -- full transaction syntax