CopyPastor

Detecting plagiarism made easy.

Score: 0.8513870239257812; Reported for: String similarity Open both answers

Possible Plagiarism

Plagiarized on 2023-12-05
by Balaji

Original Post

Original - Posted on 2019-04-30
by Tom



            
Present in both answers; Present only in the new answer; Present only in the old answer;

You can use **Cursor** methods **limit** and **skip** as per your requirement.
Create variables in the pipeline as shown below:
![enter image description here](https://i.imgur.com/3RVdHzM.png)
Intialize **count** and **rowswritten** variables to `0` with `@int('0')` expression using set variable activities.
Next, use **Until activity** with below dynamic expression.
![enter image description here](https://i.imgur.com/gFa9IJQ.png)
Inside Until activity, take copy activity with cosmos mongo db as source and blob as sink.
In the copy activity source, give the `count` variable for the `skip` and set the `limit` to `100`.
Here, I have given the limit as 2: ![enter image description here](https://i.imgur.com/rBC2CyH.png)
Use dataset parameter for the filename and give the below expression for it. ``` File@{variables('count')}.json ``` ![enter image description here](https://i.imgur.com/fGPA1rE.png)
Now, use a set variable to increment the `count` variable and store it in `temp` variable. Here, I did by 2 but you need to increment it by `100`.
![enter image description here](https://i.imgur.com/gDXv3Uw.png)
Store it back to `count` variable.
![enter image description here](https://i.imgur.com/205kVVD.png)
Update the `rowswritten` variable with number of rows read by current copy activity. ``` @activity('Copy data2').output.rowsRead ```
![enter image description here](https://i.imgur.com/6ZOrdZR.png)
If this is `0` then our until activity stops and that means source data will be copied like below. ![enter image description here](https://i.imgur.com/9GSykJH.png)
This is pipeline json for reference:
``` { "name": "pipeline1", "properties": { "activities": [ { "name": "Set variable1", "type": "SetVariable", "dependsOn": [], "policy": { "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "variableName": "count", "value": { "value": "@int('0')", "type": "Expression" } } }, { "name": "Set variable2", "type": "SetVariable", "dependsOn": [ { "activity": "Set variable1", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "variableName": "rowsWritten", "value": { "value": "@int('0')", "type": "Expression" } } }, { "name": "Until1", "type": "Until", "dependsOn": [ { "activity": "Set variable2", "dependencyConditions": [ "Succeeded" ] } ], "userProperties": [], "typeProperties": { "expression": { "value": "@equals(variables('rowsWritten'), 0)", "type": "Expression" }, "activities": [ { "name": "Copy data2", "type": "Copy", "dependsOn": [], "policy": { "timeout": "0.12:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "source": { "type": "CosmosDbMongoDbApiSource", "batchSize": 100, "cursorMethods": { "skip": { "value": "@variables('count')", "type": "Expression" }, "limit": 2 } }, "sink": { "type": "JsonSink", "storeSettings": { "type": "AzureBlobStorageWriteSettings" }, "formatSettings": { "type": "JsonWriteSettings" } }, "enableStaging": false }, "inputs": [ { "referenceName": "CosmosDbMongoDbCollection1", "type": "DatasetReference" } ], "outputs": [ { "referenceName": "Json1", "type": "DatasetReference", "parameters": { "FileName": { "value": "File@{variables('count')}.json", "type": "Expression" } } } ] }, { "name": "Set variable3", "type": "SetVariable", "dependsOn": [ { "activity": "Copy data2", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "variableName": "temp", "value": { "value": "@add(variables('count'), 2)", "type": "Expression" } } }, { "name": "Set variable4", "type": "SetVariable", "dependsOn": [ { "activity": "Set variable3", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "variableName": "count", "value": { "value": "@variables('temp')", "type": "Expression" } } }, { "name": "Set variable5", "type": "SetVariable", "dependsOn": [ { "activity": "Set variable4", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "variableName": "rowsWritten", "value": { "value": "@activity('Copy data2').output.rowsRead", "type": "Expression" } } } ], "timeout": "0.12:00:00" } } ], "variables": { "count": { "type": "Integer" }, "rowsWritten": { "type": "Integer" }, "temp": { "type": "Integer" } }, "annotations": [] } } ```
I also struggled a bit with this, especially getting around the size limits of the Lookup activity, since we have a LOT of data to migrate. I ended up creating a JSON file with a list of timestamps to query the Cosmos data with, then for each of those, getting the document IDs in that range, and then for each of those, getting the full document data and saving it to a path such as `PartitionKey/DocumentID`. Here's the pipelines I created:
LookupTimestamps - loops through each timestamp range from a `times.json` file, and for each timestamp, executes the ExportFromCosmos pipeline
{ "name": "LookupTimestamps", "properties": { "activities": [ { "name": "LookupTimestamps", "type": "Lookup", "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "typeProperties": { "source": { "type": "BlobSource", "recursive": false }, "dataset": { "referenceName": "BlobStorageTimestamps", "type": "DatasetReference" }, "firstRowOnly": false } }, { "name": "ForEachTimestamp", "type": "ForEach", "dependsOn": [ { "activity": "LookupTimestamps", "dependencyConditions": [ "Succeeded" ] } ], "typeProperties": { "items": { "value": "@activity('LookupTimestamps').output.value", "type": "Expression" }, "isSequential": false, "activities": [ { "name": "Execute Pipeline1", "type": "ExecutePipeline", "typeProperties": { "pipeline": { "referenceName": "ExportFromCosmos", "type": "PipelineReference" }, "waitOnCompletion": true, "parameters": { "From": { "value": "@{item().From}", "type": "Expression" }, "To": { "value": "@{item().To}", "type": "Expression" } } } } ] } } ] }, "type": "Microsoft.DataFactory/factories/pipelines" }
ExportFromCosmos - nested pipeline that's executed from the above pipeline. This is to get around the fact you can't have nested ForEach activities.
{ "name": "ExportFromCosmos", "properties": { "activities": [ { "name": "LookupDocuments", "type": "Lookup", "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "typeProperties": { "source": { "type": "DocumentDbCollectionSource", "query": { "value": "select c.id, c.partitionKey from c where c._ts >= @{pipeline().parameters.from} and c._ts <= @{pipeline().parameters.to} order by c._ts desc", "type": "Expression" }, "nestingSeparator": "." }, "dataset": { "referenceName": "CosmosDb", "type": "DatasetReference" }, "firstRowOnly": false } }, { "name": "ForEachDocument", "type": "ForEach", "dependsOn": [ { "activity": "LookupDocuments", "dependencyConditions": [ "Succeeded" ] } ], "typeProperties": { "items": { "value": "@activity('LookupDocuments').output.value", "type": "Expression" }, "activities": [ { "name": "Copy1", "type": "Copy", "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "typeProperties": { "source": { "type": "DocumentDbCollectionSource", "query": { "value": "select * from c where c.id = \"@{item().id}\" and c.partitionKey = \"@{item().partitionKey}\"", "type": "Expression" }, "nestingSeparator": "." }, "sink": { "type": "BlobSink" }, "enableStaging": false }, "inputs": [ { "referenceName": "CosmosDb", "type": "DatasetReference" } ], "outputs": [ { "referenceName": "BlobStorageDocuments", "type": "DatasetReference", "parameters": { "id": { "value": "@item().id", "type": "Expression" }, "partitionKey": { "value": "@item().partitionKey", "type": "Expression" } } } ] } ] } } ], "parameters": { "from": { "type": "int" }, "to": { "type": "int" } } } }
BlobStorageTimestamps - dataset for the `times.json` file
{ "name": "BlobStorageTimestamps", "properties": { "linkedServiceName": { "referenceName": "AzureBlobStorage1", "type": "LinkedServiceReference" }, "type": "AzureBlob", "typeProperties": { "format": { "type": "JsonFormat", "filePattern": "arrayOfObjects" }, "fileName": "times.json", "folderPath": "mycollection" } }, "type": "Microsoft.DataFactory/factories/datasets" }
BlobStorageDocuments - dataset for where the documents will be saved
{ "name": "BlobStorageDocuments", "properties": { "linkedServiceName": { "referenceName": "AzureBlobStorage1", "type": "LinkedServiceReference" }, "parameters": { "id": { "type": "string" }, "partitionKey": { "type": "string" } }, "type": "AzureBlob", "typeProperties": { "format": { "type": "JsonFormat", "filePattern": "arrayOfObjects" }, "fileName": { "value": "@{dataset().partitionKey}/@{dataset().id}.json", "type": "Expression" }, "folderPath": "mycollection" } }, "type": "Microsoft.DataFactory/factories/datasets" }
The times.json file it just a list of epoch times and looks like this:
[{ "From": 1556150400, "To": 1556236799 }, { "From": 1556236800, "To": 1556323199 }]

        
Present in both answers; Present only in the new answer; Present only in the old answer;