You can use **Cursor** methods **limit** and **skip** as per your requirement.
Create variables in the pipeline as shown below:

Intialize **count** and **rowswritten** variables to `0` with `@int('0')` expression using set variable activities.
Next, use **Until activity** with below dynamic expression.

Inside Until activity, take copy activity with cosmos mongo db as source and blob as sink.
In the copy activity source, give the `count` variable for the `skip` and set the `limit` to `100`.
Here, I have given the limit as 2:

Use dataset parameter for the filename and give the below expression for it.
```
File@{variables('count')}.json
```

Now, use a set variable to increment the `count` variable and store it in `temp` variable. Here, I did by 2 but you need to increment it by `100`.

Store it back to `count` variable.

Update the `rowswritten` variable with number of rows read by current copy activity.
```
@activity('Copy data2').output.rowsRead
```

If this is `0` then our until activity stops and that means source data will be copied like below.

This is pipeline json for reference:
```
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Set variable1",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "count",
"value": {
"value": "@int('0')",
"type": "Expression"
}
}
},
{
"name": "Set variable2",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Set variable1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "rowsWritten",
"value": {
"value": "@int('0')",
"type": "Expression"
}
}
},
{
"name": "Until1",
"type": "Until",
"dependsOn": [
{
"activity": "Set variable2",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@equals(variables('rowsWritten'), 0)",
"type": "Expression"
},
"activities": [
{
"name": "Copy data2",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "CosmosDbMongoDbApiSource",
"batchSize": 100,
"cursorMethods": {
"skip": {
"value": "@variables('count')",
"type": "Expression"
},
"limit": 2
}
},
"sink": {
"type": "JsonSink",
"storeSettings": {
"type": "AzureBlobStorageWriteSettings"
},
"formatSettings": {
"type": "JsonWriteSettings"
}
},
"enableStaging": false
},
"inputs": [
{
"referenceName": "CosmosDbMongoDbCollection1",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "Json1",
"type": "DatasetReference",
"parameters": {
"FileName": {
"value": "File@{variables('count')}.json",
"type": "Expression"
}
}
}
]
},
{
"name": "Set variable3",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Copy data2",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "temp",
"value": {
"value": "@add(variables('count'), 2)",
"type": "Expression"
}
}
},
{
"name": "Set variable4",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Set variable3",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "count",
"value": {
"value": "@variables('temp')",
"type": "Expression"
}
}
},
{
"name": "Set variable5",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Set variable4",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "rowsWritten",
"value": {
"value": "@activity('Copy data2').output.rowsRead",
"type": "Expression"
}
}
}
],
"timeout": "0.12:00:00"
}
}
],
"variables": {
"count": {
"type": "Integer"
},
"rowsWritten": {
"type": "Integer"
},
"temp": {
"type": "Integer"
}
},
"annotations": []
}
}
```
I also struggled a bit with this, especially getting around the size limits of the Lookup activity, since we have a LOT of data to migrate. I ended up creating a JSON file with a list of timestamps to query the Cosmos data with, then for each of those, getting the document IDs in that range, and then for each of those, getting the full document data and saving it to a path such as `PartitionKey/DocumentID`. Here's the pipelines I created:
LookupTimestamps - loops through each timestamp range from a `times.json` file, and for each timestamp, executes the ExportFromCosmos pipeline
{
"name": "LookupTimestamps",
"properties": {
"activities": [
{
"name": "LookupTimestamps",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": false
},
"dataset": {
"referenceName": "BlobStorageTimestamps",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEachTimestamp",
"type": "ForEach",
"dependsOn": [
{
"activity": "LookupTimestamps",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('LookupTimestamps').output.value",
"type": "Expression"
},
"isSequential": false,
"activities": [
{
"name": "Execute Pipeline1",
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "ExportFromCosmos",
"type": "PipelineReference"
},
"waitOnCompletion": true,
"parameters": {
"From": {
"value": "@{item().From}",
"type": "Expression"
},
"To": {
"value": "@{item().To}",
"type": "Expression"
}
}
}
}
]
}
}
]
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
ExportFromCosmos - nested pipeline that's executed from the above pipeline. This is to get around the fact you can't have nested ForEach activities.
{
"name": "ExportFromCosmos",
"properties": {
"activities": [
{
"name": "LookupDocuments",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"query": {
"value": "select c.id, c.partitionKey from c where c._ts >= @{pipeline().parameters.from} and c._ts <= @{pipeline().parameters.to} order by c._ts desc",
"type": "Expression"
},
"nestingSeparator": "."
},
"dataset": {
"referenceName": "CosmosDb",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEachDocument",
"type": "ForEach",
"dependsOn": [
{
"activity": "LookupDocuments",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('LookupDocuments').output.value",
"type": "Expression"
},
"activities": [
{
"name": "Copy1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"query": {
"value": "select * from c where c.id = \"@{item().id}\" and c.partitionKey = \"@{item().partitionKey}\"",
"type": "Expression"
},
"nestingSeparator": "."
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false
},
"inputs": [
{
"referenceName": "CosmosDb",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "BlobStorageDocuments",
"type": "DatasetReference",
"parameters": {
"id": {
"value": "@item().id",
"type": "Expression"
},
"partitionKey": {
"value": "@item().partitionKey",
"type": "Expression"
}
}
}
]
}
]
}
}
],
"parameters": {
"from": {
"type": "int"
},
"to": {
"type": "int"
}
}
}
}
BlobStorageTimestamps - dataset for the `times.json` file
{
"name": "BlobStorageTimestamps",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "arrayOfObjects"
},
"fileName": "times.json",
"folderPath": "mycollection"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
BlobStorageDocuments - dataset for where the documents will be saved
{
"name": "BlobStorageDocuments",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"parameters": {
"id": {
"type": "string"
},
"partitionKey": {
"type": "string"
}
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "arrayOfObjects"
},
"fileName": {
"value": "@{dataset().partitionKey}/@{dataset().id}.json",
"type": "Expression"
},
"folderPath": "mycollection"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
The times.json file it just a list of epoch times and looks like this:
[{
"From": 1556150400,
"To": 1556236799
},
{
"From": 1556236800,
"To": 1556323199
}]