I am creating a CRUD application to add records to Cosmos DB. I have a JSON file that i use for the data. When there is only one object everything works but when my json file contains multiple objects only the last one is saved to the database. Here is my code :
[HttpPost]
public async Task<ActionResult> Import(HttpPostedFileBase jsonFile, Ticket tview)
{
if (jsonFile == null || !Path.GetFileName(jsonFile.FileName).EndsWith(".json"))
{
ViewBag.Error = "Invalid file !";
}
else
{
jsonFile.SaveAs(Server.MapPath("~/JSONFiles/" + Path.GetFileName(jsonFile.FileName)));
StreamReader streamReader = new StreamReader(Server.MapPath("~/JSONFiles/" + Path.GetFileName(jsonFile.FileName)));
string data = streamReader.ReadToEnd();
List<Ticket> tickets = JsonConvert.DeserializeObject<List<Ticket>>(data);
tickets.ForEach(t =>
{
Ticket ticket = new Ticket()
{
Id = t.Id,
Requester = t.Requester,
PrjName = t.PrjName,
Categorie = t.Categorie,
ClientName = t.ClientName,
Service = t.Service,
Description = t.Description,
Status = t.Status
};
tview = ticket;
});
await DocumentDBRepository<Ticket>.CreateItemAsync(tview);
ViewBag.Success = "Data added";
return RedirectToAction("Index");
}
return View(tview);
I think because I should change tview to List<Ticket>tview but to save the data I use this method
public static async Task<Document> CreateItemAsync(T item)
{
return await client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId), item);
}
Thank you for the help
1 Answer 1
You can import only single document on cosmos db by using CreateDocumentAsync.
If you want to import multiple document then this github project would be awesome in your case.
You need to install below package from Nuget Package Manager.
Use below commands in package manager console.
Install-Package Microsoft.Azure.CosmosDB.BulkExecutor -Version 1.1.0
Install-Package Microsoft.Azure.DocumentDB -Version 2.0.0
Install-Package Newtonsoft.Json -Version 9.0.1
You will find that you can import multiple document by using below code inside BulkImportSample folder in same project under Program.cs
namespace BulkImportSample
{
using System;
using System.Configuration;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.CosmosDB.BulkExecutor;
using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
class Program
{
private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
private static readonly ConnectionPolicy ConnectionPolicy = new ConnectionPolicy
{
ConnectionMode = ConnectionMode.Direct,
ConnectionProtocol = Protocol.Tcp
};
private DocumentClient client;
/// <summary>
/// Initializes a new instance of the <see cref="Program"/> class.
/// </summary>
/// <param name="client">The DocumentDB client instance.</param>
private Program(DocumentClient client)
{
this.client = client;
}
public static void Main(string[] args)
{
Trace.WriteLine("Summary:");
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine(String.Format("Endpoint: {0}", EndpointUrl));
Trace.WriteLine(String.Format("Collection : {0}.{1}", DatabaseName, CollectionName));
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine("");
try
{
using (var client = new DocumentClient(
new Uri(EndpointUrl),
AuthorizationKey,
ConnectionPolicy))
{
var program = new Program(client);
program.RunBulkImportAsync().Wait();
}
}
catch (AggregateException e)
{
Trace.TraceError("Caught AggregateException in Main, Inner Exception:\n" + e);
Console.ReadKey();
}
}
/// <summary>
/// Driver function for bulk import.
/// </summary>
/// <returns></returns>
private async Task RunBulkImportAsync()
{
// Cleanup on start if set in config.
DocumentCollection dataCollection = null;
try
{
if (bool.Parse(ConfigurationManager.AppSettings["ShouldCleanupOnStart"]))
{
Database database = Utils.GetDatabaseIfExists(client, DatabaseName);
if (database != null)
{
await client.DeleteDatabaseAsync(database.SelfLink);
}
Trace.TraceInformation("Creating database {0}", DatabaseName);
database = await client.CreateDatabaseAsync(new Database { Id = DatabaseName });
Trace.TraceInformation(String.Format("Creating collection {0} with {1} RU/s", CollectionName, CollectionThroughput));
dataCollection = await Utils.CreatePartitionedCollectionAsync(client, DatabaseName, CollectionName, CollectionThroughput);
}
else
{
dataCollection = Utils.GetCollectionIfExists(client, DatabaseName, CollectionName);
if (dataCollection == null)
{
throw new Exception("The data collection does not exist");
}
}
}
catch (Exception de)
{
Trace.TraceError("Unable to initialize, exception message: {0}", de.Message);
throw;
}
// Prepare for bulk import.
// Creating documents with simple partition key here.
string partitionKeyProperty = dataCollection.PartitionKey.Paths[0].Replace("/", "");
long numberOfDocumentsToGenerate = long.Parse(ConfigurationManager.AppSettings["NumberOfDocumentsToImport"]);
int numberOfBatches = int.Parse(ConfigurationManager.AppSettings["NumberOfBatches"]);
long numberOfDocumentsPerBatch = (long)Math.Floor(((double)numberOfDocumentsToGenerate) / numberOfBatches);
// Set retry options high for initialization (default values).
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
await bulkExecutor.InitializeAsync();
// Set retries to 0 to pass control to bulk executor.
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
BulkImportResponse bulkImportResponse = null;
long totalNumberOfDocumentsInserted = 0;
double totalRequestUnitsConsumed = 0;
double totalTimeTakenSec = 0;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
for (int i = 0; i < numberOfBatches; i++)
{
// Generate JSON-serialized documents to import.
List<string> documentsToImportInBatch = new List<string>();
long prefix = i * numberOfDocumentsPerBatch;
Trace.TraceInformation(String.Format("Generating {0} documents to import for batch {1}", numberOfDocumentsPerBatch, i));
for (int j = 0; j < numberOfDocumentsPerBatch; j++)
{
string partitionKeyValue = (prefix + j).ToString();
string id = partitionKeyValue + Guid.NewGuid().ToString();
documentsToImportInBatch.Add(Utils.GenerateRandomDocumentString(id, partitionKeyProperty, partitionKeyValue));
}
// Invoke bulk import API.
var tasks = new List<Task>();
tasks.Add(Task.Run(async () =>
{
Trace.TraceInformation(String.Format("Executing bulk import for batch {0}", i));
do
{
try
{
bulkImportResponse = await bulkExecutor.BulkImportAsync(
documents: documentsToImportInBatch,
enableUpsert: true,
disableAutomaticIdGeneration: true,
maxConcurrencyPerPartitionKeyRange: null,
maxInMemorySortingBatchSize: null,
cancellationToken: token);
}
catch (DocumentClientException de)
{
Trace.TraceError("Document client exception: {0}", de);
break;
}
catch (Exception e)
{
Trace.TraceError("Exception: {0}", e);
break;
}
} while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
Trace.WriteLine(String.Format("\nSummary for batch {0}:", i));
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
bulkImportResponse.NumberOfDocumentsImported,
Math.Round(bulkImportResponse.NumberOfDocumentsImported / bulkImportResponse.TotalTimeTaken.TotalSeconds),
Math.Round(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.TotalTimeTaken.TotalSeconds),
bulkImportResponse.TotalTimeTaken.TotalSeconds));
Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.NumberOfDocumentsImported)));
Trace.WriteLine("---------------------------------------------------------------------\n ");
totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
},
token));
/*
tasks.Add(Task.Run(() =>
{
char ch = Console.ReadKey(true).KeyChar;
if (ch == 'c' || ch == 'C')
{
tokenSource.Cancel();
Trace.WriteLine("\nTask cancellation requested.");
}
}));
*/
await Task.WhenAll(tasks);
}
Trace.WriteLine("Overall summary:");
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
totalNumberOfDocumentsInserted,
Math.Round(totalNumberOfDocumentsInserted / totalTimeTakenSec),
Math.Round(totalRequestUnitsConsumed / totalTimeTakenSec),
totalTimeTakenSec));
Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
(totalRequestUnitsConsumed / totalNumberOfDocumentsInserted)));
Trace.WriteLine("--------------------------------------------------------------------- ");
// Cleanup on finish if set in config.
if (bool.Parse(ConfigurationManager.AppSettings["ShouldCleanupOnFinish"]))
{
Trace.TraceInformation("Deleting Database {0}", DatabaseName);
await client.DeleteDatabaseAsync(UriFactory.CreateDatabaseUri(DatabaseName));
}
Trace.WriteLine("\nPress any key to exit.");
Console.ReadKey();
}
}
}
5 Comments
tickets.ForEach(async t => {}); and the when I try to do the import I have a new error : "An asynchronous module or handler completed while an asynchronous operation was still pending."await DocumentDBRepository<Ticket>.CreateItemAsync(tview); The 'await' operator can only be used within an async lambda expression. Consider marking this lambda expression with the 'async' modifier.Explore related questions
See similar questions with these tags.
await DocumentDBRepository<Ticket>.CreateItemAsync(tview);insidetickets.ForEachjust below thetview = ticket;