Apache Druid
  • Technology
  • Use Cases
  • Powered By
  • Docs
  • Community
  • Apache
  • Download

โ€บDruid SQL

Getting started

  • Introduction to Apache Druid
  • Quickstart (local)
  • Single server deployment
  • Clustered deployment

Tutorials

  • Load files natively
  • Load files using SQL ๐Ÿ†•
  • Load from Apache Kafka
  • Load from Apache Hadoop
  • Querying data
  • Roll-up
  • Theta sketches
  • Configuring data retention
  • Updating existing data
  • Compacting segments
  • Deleting data
  • Writing an ingestion spec
  • Transforming input data
  • Tutorial: Run with Docker
  • Kerberized HDFS deep storage
  • Convert ingestion spec to SQL
  • Jupyter Notebook tutorials

Design

  • Design
  • Segments
  • Processes and servers
  • Deep storage
  • Metadata storage
  • ZooKeeper

Ingestion

  • Ingestion
  • Data formats
  • Data model
  • Data rollup
  • Partitioning
  • Ingestion spec
  • Schema design tips
  • Stream ingestion

    • Apache Kafka ingestion
    • Apache Kafka supervisor
    • Apache Kafka operations
    • Amazon Kinesis

    Batch ingestion

    • Native batch
    • Native batch: input sources
    • Migrate from firehose
    • Hadoop-based

    SQL-based ingestion ๐Ÿ†•

    • Overview
    • Key concepts
    • API
    • Security
    • Examples
    • Reference
    • Known issues
  • Task reference
  • Troubleshooting FAQ

Data management

  • Overview
  • Data updates
  • Data deletion
  • Schema changes
  • Compaction
  • Automatic compaction

Querying

    Druid SQL

    • Overview and syntax
    • SQL data types
    • Operators
    • Scalar functions
    • Aggregation functions
    • Multi-value string functions
    • JSON functions
    • All functions
    • Druid SQL API
    • JDBC driver API
    • SQL query context
    • SQL metadata tables
    • SQL query translation
  • Native queries
  • Query execution
  • Troubleshooting
  • Concepts

    • Datasources
    • Joins
    • Lookups
    • Multi-value dimensions
    • Nested columns
    • Multitenancy
    • Query caching
    • Using query caching
    • Query context

    Native query types

    • Timeseries
    • TopN
    • GroupBy
    • Scan
    • Search
    • TimeBoundary
    • SegmentMetadata
    • DatasourceMetadata

    Native query components

    • Filters
    • Granularities
    • Dimensions
    • Aggregations
    • Post-aggregations
    • Expressions
    • Having filters (groupBy)
    • Sorting and limiting (groupBy)
    • Sorting (topN)
    • String comparators
    • Virtual columns
    • Spatial filters

Configuration

  • Configuration reference
  • Extensions
  • Logging

Operations

  • Web console
  • Java runtime
  • Security

    • Security overview
    • User authentication and authorization
    • LDAP auth
    • Password providers
    • Dynamic Config Providers
    • TLS support

    Performance tuning

    • Basic cluster tuning
    • Segment size optimization
    • Mixed workloads
    • HTTP compression
    • Automated metadata cleanup

    Monitoring

    • Request logging
    • Metrics
    • Alerts
  • API reference
  • High availability
  • Rolling updates
  • Using rules to drop and retain data
  • Working with different versions of Apache Hadoop
  • Misc

    • dump-segment tool
    • reset-cluster tool
    • insert-segment-to-db tool
    • pull-deps tool
    • Deep storage migration
    • Export Metadata Tool
    • Metadata Migration
    • Content for build.sbt

Development

  • Developing on Druid
  • Creating extensions
  • JavaScript functionality
  • Build from source
  • Versioning
  • Experimental features

Misc

  • Papers

Hidden

  • Apache Druid vs Elasticsearch
  • Apache Druid vs. Key/Value Stores (HBase/Cassandra/OpenTSDB)
  • Apache Druid vs Kudu
  • Apache Druid vs Redshift
  • Apache Druid vs Spark
  • Apache Druid vs SQL-on-Hadoop
  • Authentication and Authorization
  • Broker
  • Coordinator Process
  • Historical Process
  • Indexer Process
  • Indexing Service
  • MiddleManager Process
  • Overlord Process
  • Router Process
  • Peons
  • Approximate Histogram aggregators
  • Apache Avro
  • Microsoft Azure
  • Bloom Filter
  • DataSketches extension
  • DataSketches HLL Sketch module
  • DataSketches Quantiles Sketch module
  • DataSketches Theta Sketch module
  • DataSketches Tuple Sketch module
  • Basic Security
  • Kerberos
  • Cached Lookup Module
  • Apache Ranger Security
  • Google Cloud Storage
  • HDFS
  • Apache Kafka Lookups
  • Globally Cached Lookups
  • MySQL Metadata Store
  • ORC Extension
  • Druid pac4j based Security extension
  • Apache Parquet Extension
  • PostgreSQL Metadata Store
  • Protobuf
  • S3-compatible
  • Simple SSLContext Provider Module
  • Stats aggregator
  • Test Stats Aggregators
  • Druid AWS RDS Module
  • Kubernetes
  • Ambari Metrics Emitter
  • Apache Cassandra
  • Rackspace Cloud Files
  • DistinctCount Aggregator
  • Graphite Emitter
  • InfluxDB Line Protocol Parser
  • InfluxDB Emitter
  • Kafka Emitter
  • Materialized View
  • Moment Sketches for Approximate Quantiles module
  • Moving Average Query
  • OpenTSDB Emitter
  • Druid Redis Cache
  • Microsoft SQLServer
  • StatsD Emitter
  • T-Digest Quantiles Sketch module
  • Thrift
  • Timestamp Min/Max aggregators
  • GCE Extensions
  • Aliyun OSS
  • Prometheus Emitter
  • kubernetes
  • Cardinality/HyperUnique aggregators
  • Select
  • Firehose (deprecated)
  • Native batch (simple)
  • Realtime Process
Edit

SQL query translation

Apache Druid supports two query languages: Druid SQL and native queries. This document describes the SQL language.

Druid uses Apache Calcite to parse and plan SQL queries. Druid translates SQL statements into its native JSON-based query language. In general, the slight overhead of translating SQL on the Broker is the only minor performance penalty to using Druid SQL compared to native queries.

This topic includes best practices and tools to help you achieve good performance and minimize the impact of translation.

Best practices

Consider this (non-exhaustive) list of things to look out for when looking into the performance implications of how your SQL queries are translated to native queries.

  1. If you wrote a filter on the primary time column __time, make sure it is being correctly translated to an "intervals" filter, as described in the Time filters section below. If not, you may need to change the way you write the filter.

  2. Try to avoid subqueries underneath joins: they affect both performance and scalability. This includes implicit subqueries generated by conditions on mismatched types, and implicit subqueries generated by conditions that use expressions to refer to the right-hand side.

  3. Currently, Druid does not support pushing down predicates (condition and filter) past a Join (i.e. into Join's children). Druid only supports pushing predicates into the join if they originated from above the join. Hence, the location of predicates and filters in your Druid SQL is very important. Also, as a result of this, comma joins should be avoided.

  4. Read through the Query execution page to understand how various types of native queries will be executed.

  5. Be careful when interpreting EXPLAIN PLAN output, and use request logging if in doubt. Request logs will show the exact native query that was run. See the next section for more details.

  6. If you encounter a query that could be planned better, feel free to raise an issue on GitHub. A reproducible test case is always appreciated.

Interpreting EXPLAIN PLAN output

The EXPLAIN PLAN functionality can help you understand how a given SQL query will be translated to native. EXPLAIN PLAN statements return a RESOURCES column that describes the resource being queried as well as a PLAN column that contains a JSON array of native queries that Druid will run. For example, consider the following query:

EXPLAIN PLAN FOR
SELECT
  channel,
  COUNT(*)
FROM wikipedia
WHERE channel IN (SELECT page FROM wikipedia GROUP BY page ORDER BY COUNT(*) DESC LIMIT 10)
GROUP BY channel

The EXPLAIN PLAN statement returns the following plan:

[
  {
    "query": {
      "queryType": "topN",
      "dataSource": {
        "type": "join",
        "left": {
          "type": "table",
          "name": "wikipedia"
        },
        "right": {
          "type": "query",
          "query": {
            "queryType": "groupBy",
            "dataSource": {
              "type": "table",
              "name": "wikipedia"
            },
            "intervals": {
              "type": "intervals",
              "intervals": [
                "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
              ]
            },
            "granularity": {
              "type": "all"
            },
            "dimensions": [
              {
                "type": "default",
                "dimension": "page",
                "outputName": "d0",
                "outputType": "STRING"
              }
            ],
            "aggregations": [
              {
                "type": "count",
                "name": "a0"
              }
            ],
            "limitSpec": {
              "type": "default",
              "columns": [
                {
                  "dimension": "a0",
                  "direction": "descending",
                  "dimensionOrder": {
                    "type": "numeric"
                  }
                }
              ],
              "limit": 10
            },
            "context": {
              "sqlOuterLimit": 101,
              "sqlQueryId": "ee616a36-c30c-4eae-af00-245127956e42",
              "useApproximateCountDistinct": false,
              "useApproximateTopN": false
            }
          }
        },
        "rightPrefix": "j0.",
        "condition": "(\"channel\" == \"j0.d0\")",
        "joinType": "INNER"
      },
      "dimension": {
        "type": "default",
        "dimension": "channel",
        "outputName": "d0",
        "outputType": "STRING"
      },
      "metric": {
        "type": "dimension",
        "ordering": {
          "type": "lexicographic"
        }
      },
      "threshold": 101,
      "intervals": {
        "type": "intervals",
        "intervals": [
          "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
        ]
      },
      "granularity": {
        "type": "all"
      },
      "aggregations": [
        {
          "type": "count",
          "name": "a0"
        }
      ],
      "context": {
        "sqlOuterLimit": 101,
        "sqlQueryId": "ee616a36-c30c-4eae-af00-245127956e42",
        "useApproximateCountDistinct": false,
        "useApproximateTopN": false
      }
    },
    "signature": [
      {
        "name": "d0",
        "type": "STRING"
      },
      {
        "name": "a0",
        "type": "LONG"
      }
    ]
  }
]

In this case the JOIN operator gets translated to a join datasource. See the Join translation section for more details about how this works.

We can see this for ourselves using Druid's request logging feature. After enabling logging and running this query, we can see that it actually runs as the following native query.

{
  "queryType": "groupBy",
  "dataSource": {
    "type": "join",
    "left": "wikipedia",
    "right": {
      "type": "query",
      "query": {
        "queryType": "topN",
        "dataSource": "wikipedia",
        "dimension": {"type": "default", "dimension": "page", "outputName": "d0"},
        "metric": {"type": "numeric", "metric": "a0"},
        "threshold": 10,
        "intervals": "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z",
        "granularity": "all",
        "aggregations": [
          { "type": "count", "name": "a0"}
        ]
      }
    },
    "rightPrefix": "j0.",
    "condition": "(\"page\" == \"j0.d0\")",
    "joinType": "INNER"
  },
  "intervals": "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z",
  "granularity": "all",
  "dimensions": [
    {"type": "default", "dimension": "channel", "outputName": "d0"}
  ],
  "aggregations": [
    { "type": "count", "name": "a0"}
  ]
}

Query types

Druid SQL uses four different native query types.

  • Scan is used for queries that do not aggregate (no GROUP BY, no DISTINCT).

  • Timeseries is used for queries that GROUP BY FLOOR(__time TO unit) or TIME_FLOOR(__time, period), have no other grouping expressions, no HAVING or LIMIT clauses, no nesting, and either no ORDER BY, or an ORDER BY that orders by same expression as present in GROUP BY. It also uses Timeseries for "grand total" queries that have aggregation functions but no GROUP BY. This query type takes advantage of the fact that Druid segments are sorted by time.

  • TopN is used by default for queries that group by a single expression, do have ORDER BY and LIMIT clauses, do not have HAVING clauses, and are not nested. However, the TopN query type will deliver approximate ranking and results in some cases; if you want to avoid this, set "useApproximateTopN" to "false". TopN results are always computed in memory. See the TopN documentation for more details.

  • GroupBy is used for all other aggregations, including any nested aggregation queries. Druid's GroupBy is a traditional aggregation engine: it delivers exact results and rankings and supports a wide variety of features. GroupBy aggregates in memory if it can, but it may spill to disk if it doesn't have enough memory to complete your query. Results are streamed back from data processes through the Broker if you ORDER BY the same expressions in your GROUP BY clause, or if you don't have an ORDER BY at all. If your query has an ORDER BY referencing expressions that don't appear in the GROUP BY clause (like aggregation functions) then the Broker will materialize a list of results in memory, up to a max of your LIMIT, if any. See the GroupBy documentation for details about tuning performance and memory use.

Time filters

For all native query types, filters on the __time column will be translated into top-level query "intervals" whenever possible, which allows Druid to use its global time index to quickly prune the set of data that must be scanned. Consider this (non-exhaustive) list of time filters that will be recognized and translated to "intervals":

  • __time >= TIMESTAMP '2000-01-01 00:00:00' (comparison to absolute time)
  • __time >= CURRENT_TIMESTAMP - INTERVAL '8' HOUR (comparison to relative time)
  • FLOOR(__time TO DAY) = TIMESTAMP '2000-01-01 00:00:00' (specific day)

Refer to the Interpreting EXPLAIN PLAN output section for details on confirming that time filters are being translated as you expect.

Joins

SQL join operators are translated to native join datasources as follows:

  1. Joins that the native layer can handle directly are translated literally, to a join datasource whose left, right, and condition are faithful translations of the original SQL. This includes any SQL join where the right-hand side is a lookup or subquery, and where the condition is an equality where one side is an expression based on the left-hand table, the other side is a simple column reference to the right-hand table, and both sides of the equality are the same data type.

  2. If a join cannot be handled directly by a native join datasource as written, Druid SQL will insert subqueries to make it runnable. For example, foo INNER JOIN bar ON foo.abc = LOWER(bar.def) cannot be directly translated, because there is an expression on the right-hand side instead of a simple column access. A subquery will be inserted that effectively transforms this clause to foo INNER JOIN (SELECT LOWER(def) AS def FROM bar) t ON foo.abc = t.def.

  3. Druid SQL does not currently reorder joins to optimize queries.

Refer to the Interpreting EXPLAIN PLAN output section for details on confirming that joins are being translated as you expect.

Refer to the Query execution page for information about how joins are executed.

Subqueries

Subqueries in SQL are generally translated to native query datasources. Refer to the Query execution page for information about how subqueries are executed.

Note: Subqueries in the WHERE clause, like WHERE col1 IN (SELECT foo FROM ...) are translated to inner joins.

Approximations

Druid SQL will use approximate algorithms in some situations:

  • The COUNT(DISTINCT col) aggregation functions by default uses a variant of HyperLogLog, a fast approximate distinct counting algorithm. Druid SQL will switch to exact distinct counts if you set "useApproximateCountDistinct" to "false", either through query context or through Broker configuration.

  • GROUP BY queries over a single column with ORDER BY and LIMIT may be executed using the TopN engine, which uses an approximate algorithm. Druid SQL will switch to an exact grouping algorithm if you set "useApproximateTopN" to "false", either through query context or through Broker configuration.

  • Aggregation functions that are labeled as using sketches or approximations, such as APPROX_COUNT_DISTINCT, are always approximate, regardless of configuration.

A known issue with approximate functions based on data sketches

The APPROX_QUANTILE_DS and DS_QUANTILES_SKETCH functions can fail with an IllegalStateException if one of the sketches for the query hits maxStreamLength: the maximum number of items to store in each sketch. See GitHub issue 11544 for more details. To workaround the issue, increase value of the maximum string length with the approxQuantileDsMaxStreamLength parameter in the query context. Since it is set to 1,000,000,000 by default, you don't need to override it in most cases. See accuracy information in the DataSketches documentation for how many bytes are required per stream length. This query context parameter is a temporary solution to avoid the known issue. It may be removed in a future release after the bug is fixed.

Unsupported features

Druid does not support all SQL features. In particular, the following features are not supported.

  • JOIN between native datasources (table, lookup, subquery) and system tables.
  • JOIN conditions that are not an equality between expressions from the left- and right-hand sides.
  • JOIN conditions containing a constant value inside the condition.
  • JOIN conditions on a column which contains a multi-value dimension.
  • OVER clauses, and analytic functions such as LAG and LEAD.
  • ORDER BY for a non-aggregating query, except for ORDER BY __time or ORDER BY __time DESC, which are supported. This restriction only applies to non-aggregating queries; you can ORDER BY any column in an aggregating query.
  • DDL and DML.
  • Using Druid-specific functions like TIME_PARSE and APPROX_QUANTILE_DS on system tables.

Additionally, some Druid native query features are not supported by the SQL language. Some unsupported Druid features include:

  • Inline datasources.
  • Spatial filters.
  • Multi-value dimensions are only partially implemented in Druid SQL. There are known inconsistencies between their behavior in SQL queries and in native queries due to how they are currently treated by the SQL planner.
โ† SQL metadata tablesNative queries โ†’
  • Best practices
  • Interpreting EXPLAIN PLAN output
  • Query types
  • Time filters
  • Joins
  • Subqueries
  • Approximations
  • Unsupported features

Technologyโ€‚ยทโ€‚Use Casesโ€‚ยทโ€‚Powered by Druidโ€‚ยทโ€‚Docsโ€‚ยทโ€‚Communityโ€‚ยทโ€‚Downloadโ€‚ยทโ€‚FAQ

โ€‚ยทโ€‚โ€‚ยทโ€‚โ€‚ยทโ€‚
Copyright ยฉ 2022 Apache Software Foundation.
Except where otherwise noted, licensed under CC BY-SA 4.0.
Apache Druid, Druid, and the Druid logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.