Apache Druid
  • Technology
  • Use Cases
  • Powered By
  • Docs
  • Community
  • Apache
  • Download

›Batch ingestion

Getting started

  • Introduction to Apache Druid
  • Quickstart
  • Single server deployment
  • Clustered deployment

Tutorials

  • Loading files natively
  • Load from Apache Kafka
  • Load from Apache Hadoop
  • Querying data
  • Roll-up
  • Configuring data retention
  • Updating existing data
  • Compacting segments
  • Deleting data
  • Writing an ingestion spec
  • Transforming input data
  • Kerberized HDFS deep storage

Design

  • Design
  • Segments
  • Processes and servers
  • Deep storage
  • Metadata storage
  • ZooKeeper

Data ingestion

  • Ingestion
  • Data formats
  • Schema design tips
  • Data management
  • Stream ingestion

    • Apache Kafka
    • Amazon Kinesis
    • Tranquility

    Batch ingestion

    • Native batch
    • Hadoop-based
  • Task reference
  • Troubleshooting FAQ

Querying

  • Druid SQL
  • Native query types

    • Making native queries
    • Timeseries
    • TopN
    • GroupBy
    • Scan
    • TimeBoundary
    • SegmentMetadata
    • DatasourceMetadata
    • Search
    • Select
  • Multi-value dimensions
  • Lookups
  • Joins
  • Multitenancy considerations
  • Query caching
  • Spatial filters

Configuration

  • Configuration reference
  • Extensions
  • Logging

Operations

  • Management UIs
  • Basic cluster tuning
  • API reference
  • High availability
  • Rolling updates
  • Retaining or automatically dropping data
  • Metrics
  • Alerts
  • Working with different versions of Apache Hadoop
  • HTTP compression
  • Recommendations
  • TLS support
  • Password providers
  • dump-segment tool
  • reset-cluster tool
  • insert-segment-to-db tool
  • pull-deps tool
  • Misc

    • Deep storage migration
    • Web console
    • Export Metadata Tool
    • Getting started with Apache Druid
    • Metadata Migration
    • Segment Size Optimization
    • Content for build.sbt

Development

  • Developing on Druid
  • Creating extensions
  • JavaScript functionality
  • Build from source
  • Versioning
  • Experimental features

Misc

  • Expressions
  • Papers

Hidden

  • Apache Druid vs Elasticsearch
  • Apache Druid vs. Key/Value Stores (HBase/Cassandra/OpenTSDB)
  • Apache Druid vs Kudu
  • Apache Druid vs Redshift
  • Apache Druid vs Spark
  • Apache Druid vs SQL-on-Hadoop
  • Authentication and Authorization
  • Broker
  • Coordinator Process
  • Historical Process
  • Indexer Process
  • Indexing Service
  • MiddleManager Process
  • Overlord Process
  • Router Process
  • Peons
  • Approximate Histogram aggregators
  • Apache Avro
  • Bloom Filter
  • DataSketches extension
  • DataSketches HLL Sketch module
  • DataSketches Quantiles Sketch module
  • DataSketches Theta Sketch module
  • DataSketches Tuple Sketch module
  • Basic Security
  • Kerberos
  • Cached Lookup Module
  • Google Cloud Storage
  • HDFS
  • Apache Kafka Lookups
  • Globally Cached Lookups
  • MySQL Metadata Store
  • ORC Extension
  • Apache Parquet Extension
  • PostgreSQL Metadata Store
  • Protobuf
  • S3-compatible
  • Simple SSLContext Provider Module
  • Stats aggregator
  • Test Stats Aggregators
  • Ambari Metrics Emitter
  • Microsoft Azure
  • Apache Cassandra
  • Rackspace Cloud Files
  • DistinctCount Aggregator
  • Graphite Emitter
  • Aggregations
  • Datasources
  • Transforming Dimension Values
  • Query Filters
  • Aggregation Granularity
  • Filter groupBy query results
  • Cardinality/HyperUnique aggregators
  • Sort groupBy query results
  • Post-Aggregations
  • Query context
  • Refining search queries
  • Sorting Orders
  • TopNMetricSpec
  • Virtual Columns
  • InfluxDB Line Protocol Parser
  • InfluxDB Emitter
  • Kafka Emitter
  • Materialized View
  • Moment Sketches for Approximate Quantiles module
  • development/extensions-contrib/moving-average-query
  • OpenTSDB Emitter
  • Druid Redis Cache
  • Microsoft SQLServer
  • StatsD Emitter
  • T-Digest Quantiles Sketch module
  • Thrift
  • Timestamp Min/Max aggregators
  • Realtime Process
Edit

Native batch ingestion

Apache Druid (incubating) currently has two types of native batch indexing tasks, index_parallel which can run multiple tasks in parallel, and index which will run a single indexing task. Please refer to our Hadoop-based vs. native batch comparison table for comparisons between Hadoop-based, native batch (simple), and native batch (parallel) ingestion.

To run either kind of native batch indexing task, write an ingestion spec as specified below. Then POST it to the /druid/indexer/v1/task endpoint on the Overlord, or use the bin/post-index-task script included with Druid.

Tutorial

This page contains reference documentation for native batch ingestion. For a walk-through instead, check out the Loading a file tutorial, which demonstrates the "simple" (single-task) mode.

Parallel task

The Parallel task (type index_parallel) is a task for parallel batch indexing. This task only uses Druid's resource and doesn't depend on other external systems like Hadoop. index_parallel task is a supervisor task which basically generates multiple worker tasks and submits them to the Overlord. Each worker task reads input data and creates segments. Once they successfully generate segments for all input data, they report the generated segment list to the supervisor task. The supervisor task periodically checks the status of worker tasks. If one of them fails, it retries the failed task until the number of retries reaches the configured limit. If all worker tasks succeed, then it publishes the reported segments at once.

The parallel Index Task can run in two different modes depending on forceGuaranteedRollup in tuningConfig. If forceGuaranteedRollup = false, it's executed in a single phase. In this mode, each sub task creates segments individually and reports them to the supervisor task.

If forceGuaranteedRollup = true, it's executed in two phases with data shuffle which is similar to MapReduce. In the first phase, each sub task partitions input data based on segmentGranularity (primary partition key) in granularitySpec and partitionDimensions (secondary partition key) in partitionsSpec. The partitioned data is served by the middleManager or the indexer where the first phase tasks ran. In the second phase, each sub task fetches partitioned data from MiddleManagers or indexers and merges them to create the final segments. As in the single phase execution, the created segments are reported to the supervisor task to publish at once.

To use this task, the firehose in ioConfig should be splittable and maxNumConcurrentSubTasks should be set something larger than 1 in tuningConfig. Otherwise, this task runs sequentially. Here is the list of currently splittable firehoses.

  • LocalFirehose
  • IngestSegmentFirehose
  • HttpFirehose
  • StaticS3Firehose
  • StaticAzureBlobStoreFirehose
  • StaticGoogleBlobStoreFirehose
  • StaticCloudFilesFirehose

The splittable firehose is responsible for generating splits. The supervisor task generates worker task specs containing a split and submits worker tasks using those specs. As a result, the number of worker tasks depends on the implementation of splittable firehoses. Please note that multiple tasks can be created for the same worker task spec if one of them fails.

You may want to consider the below things:

  • The number of concurrent tasks run in parallel ingestion is determined by maxNumConcurrentSubTasks in the tuningConfig. The supervisor task checks the number of current running sub tasks and creates more if it's smaller than maxNumConcurrentSubTasks no matter how many task slots are currently available. This may affect to other ingestion performance. See the below Capacity Planning section for more details.
  • By default, batch ingestion replaces all data in any segment that it writes to. If you'd like to add to the segment instead, set the appendToExisting flag in ioConfig. Note that it only replaces data in segments where it actively adds data: if there are segments in your granularitySpec's intervals that have no data written by this task, they will be left alone.

Task syntax

A sample task is shown below:

{
  "type": "index_parallel",
  "spec": {
    "dataSchema": {
      "dataSource": "wikipedia_parallel_index_test",
      "metricsSpec": [
        {
          "type": "count",
              "name": "count"
            },
            {
              "type": "doubleSum",
              "name": "added",
              "fieldName": "added"
            },
            {
              "type": "doubleSum",
              "name": "deleted",
              "fieldName": "deleted"
            },
            {
              "type": "doubleSum",
              "name": "delta",
              "fieldName": "delta"
            }
        ],
        "granularitySpec": {
          "segmentGranularity": "DAY",
          "queryGranularity": "second",
          "intervals" : [ "2013-08-31/2013-09-02" ]
        },
        "parser": {
          "parseSpec": {
            "format" : "json",
            "timestampSpec": {
              "column": "timestamp"
            },
            "dimensionsSpec": {
              "dimensions": [
                "page",
                "language",
                "user",
                "unpatrolled",
                "newPage",
                "robot",
                "anonymous",
                "namespace",
                "continent",
                "country",
                "region",
                "city"
              ]
            }
          }
        }
    },
    "ioConfig": {
        "type": "index_parallel",
        "firehose": {
          "type": "local",
          "baseDir": "examples/indexing/",
          "filter": "wikipedia_index_data*"
        }
    },
    "tuningconfig": {
        "type": "index_parallel",
        "maxNumConcurrentSubTasks": 2
    }
  }
}
propertydescriptionrequired?
typeThe task type, this should always be index_parallel.yes
idThe task ID. If this is not explicitly specified, Druid generates the task ID using task type, data source name, interval, and date-time stamp.no
specThe ingestion spec including the data schema, IOConfig, and TuningConfig. See below for more details.yes
contextContext containing various task configuration parameters. See below for more details.no

dataSchema

This field is required.

See Ingestion Spec DataSchema

If you specify intervals explicitly in your dataSchema's granularitySpec, batch ingestion will lock the full intervals specified when it starts up, and you will learn quickly if the specified interval overlaps with locks held by other tasks (e.g., Kafka ingestion). Otherwise, batch ingestion will lock each interval as it is discovered, so you may only learn that the task overlaps with a higher-priority task later in ingestion. If you specify intervals explicitly, any rows outside the specified intervals will be thrown away. We recommend setting intervals explicitly if you know the time range of the data so that locking failure happens faster, and so that you don't accidentally replace data outside that range if there's some stray data with unexpected timestamps.

ioConfig

propertydescriptiondefaultrequired?
typeThe task type, this should always be index_parallel.noneyes
firehoseSpecify a Firehose here.noneyes
appendToExistingCreates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs.falseno

tuningConfig

The tuningConfig is optional and default parameters will be used if no tuningConfig is specified. See below for more details.

propertydescriptiondefaultrequired?
typeThe task type, this should always be index_parallel.noneyes
maxRowsPerSegmentDeprecated. Use partitionsSpec instead. Used in sharding. Determines how many rows are in each segment.5000000no
maxRowsInMemoryUsed in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.1000000no
maxBytesInMemoryUsed in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)1/6 of max JVM memoryno
maxTotalRowsDeprecated. Use partitionsSpec instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.20000000no
numShardsDeprecated. Use partitionsSpec instead. Directly specify the number of shards to create. If this is specified and intervals is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.nullno
partitionsSpecDefines how to partition data in each timeChunk, see PartitionsSpecdynamic if forceGuaranteedRollup = false, hashed if forceGuaranteedRollup = trueno
indexSpecDefines segment storage format options to be used at indexing time, see IndexSpecnullno
indexSpecForIntermediatePersistsDefines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see IndexSpec for possible values.same as indexSpecno
maxPendingPersistsMaximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)no
forceGuaranteedRollupForces guaranteeing the perfect rollup. The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, numShards in tuningConfig and intervals in granularitySpec must be set. Note that the result segments would be hash-partitioned. This flag cannot be used with appendToExisting of IOConfig. For more details, see the below Segment pushing modes section.falseno
reportParseExceptionsIf true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.falseno
pushTimeoutMilliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.0no
segmentWriteOutMediumFactorySegment write-out medium to use when creating segments. See SegmentWriteOutMediumFactory.Not specified, the value from druid.peon.defaultSegmentWriteOutMediumFactory.type is usedno
maxNumConcurrentSubTasksMaximum number of tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to maxNumConcurrentSubTasks regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check Capacity Planning for more details.1no
maxRetryMaximum number of retries on task failures.3no
maxNumSegmentsToMergeMax limit for the number of segments that a single task can merge at the same time in the second phase. Used only forceGuaranteedRollup is set.100no
totalNumMergeTasksTotal number of tasks to merge segments in the second phase when forceGuaranteedRollup is set.10no
taskStatusCheckPeriodMsPolling period in milliseconds to check running task statuses.1000no
chatHandlerTimeoutTimeout for reporting the pushed segments in worker tasks.PT10Sno
chatHandlerNumRetriesRetries for reporting the pushed segments in worker tasks.5no

partitionsSpec

PartitionsSpec is to describe the secondary partitioning method. You should use different partitionsSpec depending on the rollup mode you want. For perfect rollup, you should use hashed.

propertydescriptiondefaultrequired?
typeThis should always be hashednoneyes
numShardsDirectly specify the number of shards to create. If this is specified and intervals is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.nullno
partitionDimensionsThe dimensions to partition on. Leave blank to select all dimensions.nullno

For best-effort rollup, you should use dynamic.

propertydescriptiondefaultrequired?
typeThis should always be dynamicnoneyes
maxRowsPerSegmentUsed in sharding. Determines how many rows are in each segment.5000000no
maxTotalRowsTotal number of rows in segments waiting for being pushed. Used in determining when intermediate segment push should occur.20000000no

HTTP status endpoints

The supervisor task provides some HTTP endpoints to get running status.

  • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/mode

Returns 'parallel' if the indexing task is running in parallel. Otherwise, it returns 'sequential'.

  • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/phase

Returns the name of the current phase if the task running in the parallel mode.

  • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/progress

Returns the current progress if the supervisor task is running in the parallel mode.

An example of the result is

{
  "running":10,
  "succeeded":0,
  "failed":0,
  "complete":0,
  "total":10,
  "expectedSucceeded":10
}
  • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtasks/running

Returns the task IDs of running worker tasks, or an empty list if the supervisor task is running in the sequential mode.

  • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs

Returns all worker task specs, or an empty list if the supervisor task is running in the sequential mode.

  • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs/running

Returns running worker task specs, or an empty list if the supervisor task is running in the sequential mode.

  • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs/complete

Returns complete worker task specs, or an empty list if the supervisor task is running in the sequential mode.

  • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}

Returns the worker task spec of the given id, or HTTP 404 Not Found error if the supervisor task is running in the sequential mode.

  • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}/state

Returns the state of the worker task spec of the given id, or HTTP 404 Not Found error if the supervisor task is running in the sequential mode. The returned result contains the worker task spec, a current task status if exists, and task attempt history.

An example of the result is

{
  "spec": {
    "id": "index_parallel_lineitem_2018-04-20T22:12:43.610Z_2",
    "groupId": "index_parallel_lineitem_2018-04-20T22:12:43.610Z",
    "supervisorTaskId": "index_parallel_lineitem_2018-04-20T22:12:43.610Z",
    "context": null,
    "inputSplit": {
      "split": "/path/to/data/lineitem.tbl.5"
    },
    "ingestionSpec": {
      "dataSchema": {
        "dataSource": "lineitem",
        "parser": {
          "type": "hadoopyString",
          "parseSpec": {
            "format": "tsv",
            "delimiter": "|",
            "timestampSpec": {
              "column": "l_shipdate",
              "format": "yyyy-MM-dd"
            },
            "dimensionsSpec": {
              "dimensions": [
                "l_orderkey",
                "l_partkey",
                "l_suppkey",
                "l_linenumber",
                "l_returnflag",
                "l_linestatus",
                "l_shipdate",
                "l_commitdate",
                "l_receiptdate",
                "l_shipinstruct",
                "l_shipmode",
                "l_comment"
              ]
            },
            "columns": [
              "l_orderkey",
              "l_partkey",
              "l_suppkey",
              "l_linenumber",
              "l_quantity",
              "l_extendedprice",
              "l_discount",
              "l_tax",
              "l_returnflag",
              "l_linestatus",
              "l_shipdate",
              "l_commitdate",
              "l_receiptdate",
              "l_shipinstruct",
              "l_shipmode",
              "l_comment"
            ]
          }
        },
        "metricsSpec": [
          {
            "type": "count",
            "name": "count"
          },
          {
            "type": "longSum",
            "name": "l_quantity",
            "fieldName": "l_quantity",
            "expression": null
          },
          {
            "type": "doubleSum",
            "name": "l_extendedprice",
            "fieldName": "l_extendedprice",
            "expression": null
          },
          {
            "type": "doubleSum",
            "name": "l_discount",
            "fieldName": "l_discount",
            "expression": null
          },
          {
            "type": "doubleSum",
            "name": "l_tax",
            "fieldName": "l_tax",
            "expression": null
          }
        ],
        "granularitySpec": {
          "type": "uniform",
          "segmentGranularity": "YEAR",
          "queryGranularity": {
            "type": "none"
          },
          "rollup": true,
          "intervals": [
            "1980-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"
          ]
        },
        "transformSpec": {
          "filter": null,
          "transforms": []
        }
      },
      "ioConfig": {
        "type": "index_parallel",
        "firehose": {
          "type": "local",
          "baseDir": "/path/to/data/",
          "filter": "lineitem.tbl.5",
          "parser": null
        },
        "appendToExisting": false
      },
      "tuningConfig": {
        "type": "index_parallel",
        "maxRowsPerSegment": 5000000,
        "maxRowsInMemory": 1000000,
        "maxTotalRows": 20000000,
        "numShards": null,
        "indexSpec": {
          "bitmap": {
            "type": "concise"
          },
          "dimensionCompression": "lz4",
          "metricCompression": "lz4",
          "longEncoding": "longs"
        },
        "indexSpecForIntermediatePersists": {
          "bitmap": {
            "type": "concise"
          },
          "dimensionCompression": "lz4",
          "metricCompression": "lz4",
          "longEncoding": "longs"
        },
        "maxPendingPersists": 0,
        "reportParseExceptions": false,
        "pushTimeout": 0,
        "segmentWriteOutMediumFactory": null,
        "maxNumConcurrentSubTasks": 4,
        "maxRetry": 3,
        "taskStatusCheckPeriodMs": 1000,
        "chatHandlerTimeout": "PT10S",
        "chatHandlerNumRetries": 5,
        "logParseExceptions": false,
        "maxParseExceptions": 2147483647,
        "maxSavedParseExceptions": 0,
        "forceGuaranteedRollup": false,
        "buildV9Directly": true
      }
    }
  },
  "currentStatus": {
    "id": "index_sub_lineitem_2018-04-20T22:16:29.922Z",
    "type": "index_sub",
    "createdTime": "2018-04-20T22:16:29.925Z",
    "queueInsertionTime": "2018-04-20T22:16:29.929Z",
    "statusCode": "RUNNING",
    "duration": -1,
    "location": {
      "host": null,
      "port": -1,
      "tlsPort": -1
    },
    "dataSource": "lineitem",
    "errorMsg": null
  },
  "taskHistory": []
}
  • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}/history

Returns the task attempt history of the worker task spec of the given id, or HTTP 404 Not Found error if the supervisor task is running in the sequential mode.

Capacity planning

The supervisor task can create up to maxNumConcurrentSubTasks worker tasks no matter how many task slots are currently available. As a result, total number of tasks which can be run at the same time is (maxNumConcurrentSubTasks + 1) (including the supervisor task). Please note that this can be even larger than total number of task slots (sum of the capacity of all workers). If maxNumConcurrentSubTasks is larger than n (available task slots), then maxNumConcurrentSubTasks tasks are created by the supervisor task, but only n tasks would be started. Others will wait in the pending state until any running task is finished.

If you are using the Parallel Index Task with stream ingestion together, we would recommend to limit the max capacity for batch ingestion to prevent stream ingestion from being blocked by batch ingestion. Suppose you have t Parallel Index Tasks to run at the same time, but want to limit the max number of tasks for batch ingestion to b. Then, (sum of maxNumConcurrentSubTasks of all Parallel Index Tasks + t (for supervisor tasks)) must be smaller than b.

If you have some tasks of a higher priority than others, you may set their maxNumConcurrentSubTasks to a higher value than lower priority tasks. This may help the higher priority tasks to finish earlier than lower priority tasks by assigning more task slots to them.

Simple task

The simple task (type index) is designed to be used for smaller data sets. The task executes within the indexing service.

Task syntax

A sample task is shown below:

{
  "type" : "index",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "auto"
          },
          "dimensionsSpec" : {
            "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
            "dimensionExclusions" : [],
            "spatialDimensions" : []
          }
        }
      },
      "metricsSpec" : [
        {
          "type" : "count",
          "name" : "count"
        },
        {
          "type" : "doubleSum",
          "name" : "added",
          "fieldName" : "added"
        },
        {
          "type" : "doubleSum",
          "name" : "deleted",
          "fieldName" : "deleted"
        },
        {
          "type" : "doubleSum",
          "name" : "delta",
          "fieldName" : "delta"
        }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "DAY",
        "queryGranularity" : "NONE",
        "intervals" : [ "2013-08-31/2013-09-01" ]
      }
    },
    "ioConfig" : {
      "type" : "index",
      "firehose" : {
        "type" : "local",
        "baseDir" : "examples/indexing/",
        "filter" : "wikipedia_data.json"
       }
    },
    "tuningConfig" : {
      "type" : "index",
      "maxRowsPerSegment" : 5000000,
      "maxRowsInMemory" : 1000000
    }
  }
}
propertydescriptionrequired?
typeThe task type, this should always be index.yes
idThe task ID. If this is not explicitly specified, Druid generates the task ID using task type, data source name, interval, and date-time stamp.no
specThe ingestion spec including the data schema, IOConfig, and TuningConfig. See below for more details.yes
contextContext containing various task configuration parameters. See below for more details.no

dataSchema

This field is required.

See the dataSchema section of the ingestion docs for details.

If you do not specify intervals explicitly in your dataSchema's granularitySpec, the Local Index Task will do an extra pass over the data to determine the range to lock when it starts up. If you specify intervals explicitly, any rows outside the specified intervals will be thrown away. We recommend setting intervals explicitly if you know the time range of the data because it allows the task to skip the extra pass, and so that you don't accidentally replace data outside that range if there's some stray data with unexpected timestamps.

ioConfig

propertydescriptiondefaultrequired?
typeThe task type, this should always be "index".noneyes
firehoseSpecify a Firehose here.noneyes
appendToExistingCreates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs.falseno

tuningConfig

The tuningConfig is optional and default parameters will be used if no tuningConfig is specified. See below for more details.

propertydescriptiondefaultrequired?
typeThe task type, this should always be "index".noneyes
maxRowsPerSegmentDeprecated. Use partitionsSpec instead. Used in sharding. Determines how many rows are in each segment.5000000no
maxRowsInMemoryUsed in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.1000000no
maxBytesInMemoryUsed in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)1/6 of max JVM memoryno
maxTotalRowsDeprecated. Use partitionsSpec instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.20000000no
numShardsDeprecated. Use partitionsSpec instead. Directly specify the number of shards to create. If this is specified and intervals is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.nullno
partitionDimensionsDeprecated. Use partitionsSpec instead. The dimensions to partition on. Leave blank to select all dimensions. Only used with forceGuaranteedRollup = true, will be ignored otherwise.nullno
partitionsSpecDefines how to partition data in each timeChunk, see PartitionsSpecdynamic if forceGuaranteedRollup = false, hashed if forceGuaranteedRollup = trueno
indexSpecDefines segment storage format options to be used at indexing time, see IndexSpecnullno
indexSpecForIntermediatePersistsDefines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see IndexSpec for possible values.same as indexSpecno
maxPendingPersistsMaximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)no
forceGuaranteedRollupForces guaranteeing the perfect rollup. The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments. Note that the result segments would be hash-partitioned. This flag cannot be used with appendToExisting of IOConfig. For more details, see the below Segment pushing modes section.falseno
reportParseExceptionsDEPRECATED. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting reportParseExceptions to true will override existing configurations for maxParseExceptions and maxSavedParseExceptions, setting maxParseExceptions to 0 and limiting maxSavedParseExceptions to no more than 1.falseno
pushTimeoutMilliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.0no
segmentWriteOutMediumFactorySegment write-out medium to use when creating segments. See SegmentWriteOutMediumFactory.Not specified, the value from druid.peon.defaultSegmentWriteOutMediumFactory.type is usedno
logParseExceptionsIf true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.falseno
maxParseExceptionsThe maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if reportParseExceptions is set.unlimitedno
maxSavedParseExceptionsWhen a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the task completion report. Overridden if reportParseExceptions is set.0no

partitionsSpec

PartitionsSpec is to describe the secondary partitioning method. You should use different partitionsSpec depending on the rollup mode you want. For perfect rollup, you should use hashed.

propertydescriptiondefaultrequired?
typeThis should always be hashednoneyes
maxRowsPerSegmentUsed in sharding. Determines how many rows are in each segment.5000000no
numShardsDirectly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.nullno
partitionDimensionsThe dimensions to partition on. Leave blank to select all dimensions.nullno

For best-effort rollup, you should use dynamic.

propertydescriptiondefaultrequired?
typeThis should always be dynamicnoneyes
maxRowsPerSegmentUsed in sharding. Determines how many rows are in each segment.5000000no
maxTotalRowsTotal number of rows in segments waiting for being pushed. Used in determining when intermediate segment push should occur.20000000no

segmentWriteOutMediumFactory

FieldTypeDescriptionRequired
typeStringSee Additional Peon Configuration: SegmentWriteOutMediumFactory for explanation and available options.yes

Segment pushing modes

While ingesting data using the Index task, it creates segments from the input data and pushes them. For segment pushing, the Index task supports two segment pushing modes, i.e., bulk pushing mode and incremental pushing mode for perfect rollup and best-effort rollup, respectively.

In the bulk pushing mode, every segment is pushed at the very end of the index task. Until then, created segments are stored in the memory and local storage of the process running the index task. As a result, this mode might cause a problem due to limited storage capacity, and is not recommended to use in production.

On the contrary, in the incremental pushing mode, segments are incrementally pushed, that is they can be pushed in the middle of the index task. More precisely, the index task collects data and stores created segments in the memory and disks of the process running that task until the total number of collected rows exceeds maxTotalRows. Once it exceeds, the index task immediately pushes all segments created until that moment, cleans all pushed segments up, and continues to ingest remaining data.

To enable bulk pushing mode, forceGuaranteedRollup should be set in the TuningConfig. Note that this option cannot be used with appendToExisting of IOConfig.

Firehoses are pluggable and thus the configuration schema can and will vary based on the type of the firehose.

FieldTypeDescriptionRequired
typeStringSpecifies the type of firehose. Each value will have its own configuration schema, firehoses packaged with Druid are described below.yes

Firehoses

There are several firehoses readily available in Druid, some are meant for examples, others can be used directly in a production environment.

For additional firehoses, please see our extensions list.

LocalFirehose

This Firehose can be used to read the data from files on local disk, and is mainly intended for proof-of-concept testing, and works with string typed parsers. This Firehose is splittable and can be used by native parallel index tasks. Since each split represents a file in this Firehose, each worker task of index_parallel will read a file. A sample local Firehose spec is shown below:

{
    "type": "local",
    "filter" : "*.csv",
    "baseDir": "/data/directory"
}
propertydescriptionrequired?
typeThis should be "local".yes
filterA wildcard filter for files. See here for more information.yes
baseDirdirectory to search recursively for files to be ingested.yes

HttpFirehose

This Firehose can be used to read the data from remote sites via HTTP, and works with string typed parsers. This Firehose is splittable and can be used by native parallel index tasks. Since each split represents a file in this Firehose, each worker task of index_parallel will read a file. A sample HTTP Firehose spec is shown below:

{
    "type": "http",
    "uris": ["http://example.com/uri1", "http://example2.com/uri2"]
}

The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header. Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header.

propertydescriptiondefault
httpAuthenticationUsernameUsername to use for authentication with specified URIsNone
httpAuthenticationPasswordPasswordProvider to use with specified URIsNone

Example with authentication fields using the DefaultPassword provider (this requires the password to be in the ingestion spec):

{
    "type": "http",
    "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
    "httpAuthenticationUsername": "username",
    "httpAuthenticationPassword": "password123"
}

You can also use the other existing Druid PasswordProviders. Here is an example using the EnvironmentVariablePasswordProvider:

{
    "type": "http",
    "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
    "httpAuthenticationUsername": "username",
    "httpAuthenticationPassword": {
        "type": "environment",
        "variable": "HTTP_FIREHOSE_PW"
    }
}

The below configurations can optionally be used for tuning the Firehose performance.

propertydescriptiondefault
maxCacheCapacityBytesMaximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.1073741824
maxFetchCapacityBytesMaximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.1073741824
prefetchTriggerBytesThreshold to trigger prefetching HTTP objects.maxFetchCapacityBytes / 2
fetchTimeoutTimeout for fetching an HTTP object.60000
maxFetchRetryMaximum retries for fetching an HTTP object.3

IngestSegmentFirehose

This Firehose can be used to read the data from existing druid segments, potentially using a new schema and changing the name, dimensions, metrics, rollup, etc. of the segment. This Firehose is splittable and can be used by native parallel index tasks. This firehose will accept any type of parser, but will only utilize the list of dimensions and the timestamp specification. A sample ingest Firehose spec is shown below:

{
    "type": "ingestSegment",
    "dataSource": "wikipedia",
    "interval": "2013-01-01/2013-01-02"
}
propertydescriptionrequired?
typeThis should be "ingestSegment".yes
dataSourceA String defining the data source to fetch rows from, very similar to a table in a relational databaseyes
intervalA String representing the ISO-8601 interval. This defines the time range to fetch the data over.yes
dimensionsThe list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned.no
metricsThe list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.no
filterSee Filtersno
maxInputSegmentBytesPerTaskWhen used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.no

SqlFirehose

This Firehose can be used to ingest events residing in an RDBMS. The database connection information is provided as part of the ingestion spec. For each query, the results are fetched locally and indexed. If there are multiple queries from which data needs to be indexed, queries are prefetched in the background, up to maxFetchCapacityBytes bytes. This firehose will accept any type of parser, but will only utilize the list of dimensions and the timestamp specification. See the extension documentation for more detailed ingestion examples.

Requires one of the following extensions:

  • MySQL Metadata Store.
  • PostgreSQL Metadata Store.
{
    "type": "sql",
    "database": {
        "type": "mysql",
        "connectorConfig": {
            "connectURI": "jdbc:mysql://host:port/schema",
            "user": "user",
            "password": "password"
        }
     },
    "sqls": ["SELECT * FROM table1", "SELECT * FROM table2"]
}
propertydescriptiondefaultrequired?
typeThis should be "sql".Yes
databaseSpecifies the database connection details.Yes
maxCacheCapacityBytesMaximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.1073741824No
maxFetchCapacityBytesMaximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.1073741824No
prefetchTriggerBytesThreshold to trigger prefetching SQL result objects.maxFetchCapacityBytes / 2No
fetchTimeoutTimeout for fetching the result set.60000No
foldCaseToggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.falseNo
sqlsList of SQL queries where each SQL query would retrieve the data to be indexed.Yes

Database

propertydescriptiondefaultrequired?
typeThe type of database to query. Valid values are mysql and postgresql_Yes
connectorConfigSpecify the database connection properties via connectURI, user and passwordYes

InlineFirehose

This Firehose can be used to read the data inlined in its own spec. It can be used for demos or for quickly testing out parsing and schema, and works with string typed parsers. A sample inline Firehose spec is shown below:

{
    "type": "inline",
    "data": "0,values,formatted\n1,as,CSV"
}
propertydescriptionrequired?
typeThis should be "inline".yes
dataInlined data to ingest.yes

CombiningFirehose

This Firehose can be used to combine and merge data from a list of different Firehoses.

{
    "type": "combining",
    "delegates": [ { firehose1 }, { firehose2 }, ... ]
}
propertydescriptionrequired?
typeThis should be "combining"yes
delegatesList of Firehoses to combine data fromyes
← Realtime ProcessHadoop-based →
  • Tutorial
  • Parallel task
    • Task syntax
    • dataSchema
    • ioConfig
    • tuningConfig
    • partitionsSpec
    • HTTP status endpoints
    • Capacity planning
  • Simple task
    • Task syntax
    • dataSchema
    • ioConfig
    • tuningConfig
    • partitionsSpec
    • segmentWriteOutMediumFactory
    • Segment pushing modes
  • Firehoses
    • LocalFirehose
    • HttpFirehose
    • IngestSegmentFirehose
    • SqlFirehose
    • InlineFirehose
    • CombiningFirehose

Technology · Use Cases · Powered by Druid · Docs · Community · Download · FAQ

 ·  ·  · 
Copyright © 2019 Apache Software Foundation.
Except where otherwise noted, licensed under CC BY-SA 4.0.
Apache Druid, Druid, and the Druid logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.