0

I am creating a CRUD application to add records to Cosmos DB. I have a JSON file that i use for the data. When there is only one object everything works but when my json file contains multiple objects only the last one is saved to the database. Here is my code :

[HttpPost]
 public async Task<ActionResult> Import(HttpPostedFileBase jsonFile, Ticket tview)
 {
 if (jsonFile == null || !Path.GetFileName(jsonFile.FileName).EndsWith(".json"))
 {
 ViewBag.Error = "Invalid file !";
 }
 else
 {
 jsonFile.SaveAs(Server.MapPath("~/JSONFiles/" + Path.GetFileName(jsonFile.FileName)));
 StreamReader streamReader = new StreamReader(Server.MapPath("~/JSONFiles/" + Path.GetFileName(jsonFile.FileName)));
 string data = streamReader.ReadToEnd();
 List<Ticket> tickets = JsonConvert.DeserializeObject<List<Ticket>>(data);
 tickets.ForEach(t =>
 {
 Ticket ticket = new Ticket()
 {
 Id = t.Id,
 Requester = t.Requester,
 PrjName = t.PrjName,
 Categorie = t.Categorie,
 ClientName = t.ClientName,
 Service = t.Service,
 Description = t.Description,
 Status = t.Status
 };
 tview = ticket;
 });
 await DocumentDBRepository<Ticket>.CreateItemAsync(tview);
 ViewBag.Success = "Data added";
 return RedirectToAction("Index");
 }
 return View(tview);

I think because I should change tview to List<Ticket>tview but to save the data I use this method

public static async Task<Document> CreateItemAsync(T item)
 {
 return await client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId), item);
 }

Thank you for the help

asked Oct 11, 2018 at 11:29
2
  • move this line => await DocumentDBRepository<Ticket>.CreateItemAsync(tview); inside tickets.ForEach just below the tview = ticket; Commented Oct 11, 2018 at 11:45
  • @ershoaib I have tried to do it but I get this error "An asynchronous module or handler completed while an asynchronous operation was still pending." Commented Oct 11, 2018 at 12:09

1 Answer 1

1

You can import only single document on cosmos db by using CreateDocumentAsync.

If you want to import multiple document then this github project would be awesome in your case.

You need to install below package from Nuget Package Manager.

Use below commands in package manager console.

Install-Package Microsoft.Azure.CosmosDB.BulkExecutor -Version 1.1.0

Install-Package Microsoft.Azure.DocumentDB -Version 2.0.0

Install-Package Newtonsoft.Json -Version 9.0.1

You will find that you can import multiple document by using below code inside BulkImportSample folder in same project under Program.cs

namespace BulkImportSample
{
 using System;
 using System.Configuration;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Threading;
 using System.Threading.Tasks;
 using Microsoft.Azure.Documents;
 using Microsoft.Azure.Documents.Client;
 using Microsoft.Azure.CosmosDB.BulkExecutor;
 using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
 class Program
 {
 private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
 private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
 private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
 private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
 private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
 private static readonly ConnectionPolicy ConnectionPolicy = new ConnectionPolicy
 {
 ConnectionMode = ConnectionMode.Direct,
 ConnectionProtocol = Protocol.Tcp
 };
 private DocumentClient client;
 /// <summary>
 /// Initializes a new instance of the <see cref="Program"/> class.
 /// </summary>
 /// <param name="client">The DocumentDB client instance.</param>
 private Program(DocumentClient client)
 {
 this.client = client;
 }
 public static void Main(string[] args)
 {
 Trace.WriteLine("Summary:");
 Trace.WriteLine("--------------------------------------------------------------------- ");
 Trace.WriteLine(String.Format("Endpoint: {0}", EndpointUrl));
 Trace.WriteLine(String.Format("Collection : {0}.{1}", DatabaseName, CollectionName));
 Trace.WriteLine("--------------------------------------------------------------------- ");
 Trace.WriteLine("");
 try
 {
 using (var client = new DocumentClient(
 new Uri(EndpointUrl),
 AuthorizationKey,
 ConnectionPolicy))
 {
 var program = new Program(client);
 program.RunBulkImportAsync().Wait();
 }
 }
 catch (AggregateException e)
 {
 Trace.TraceError("Caught AggregateException in Main, Inner Exception:\n" + e);
 Console.ReadKey();
 }
 }
 /// <summary>
 /// Driver function for bulk import.
 /// </summary>
 /// <returns></returns>
 private async Task RunBulkImportAsync()
 {
 // Cleanup on start if set in config.
 DocumentCollection dataCollection = null;
 try
 {
 if (bool.Parse(ConfigurationManager.AppSettings["ShouldCleanupOnStart"]))
 {
 Database database = Utils.GetDatabaseIfExists(client, DatabaseName);
 if (database != null)
 {
 await client.DeleteDatabaseAsync(database.SelfLink);
 }
 Trace.TraceInformation("Creating database {0}", DatabaseName);
 database = await client.CreateDatabaseAsync(new Database { Id = DatabaseName });
 Trace.TraceInformation(String.Format("Creating collection {0} with {1} RU/s", CollectionName, CollectionThroughput));
 dataCollection = await Utils.CreatePartitionedCollectionAsync(client, DatabaseName, CollectionName, CollectionThroughput);
 }
 else
 {
 dataCollection = Utils.GetCollectionIfExists(client, DatabaseName, CollectionName);
 if (dataCollection == null)
 {
 throw new Exception("The data collection does not exist");
 }
 }
 }
 catch (Exception de)
 {
 Trace.TraceError("Unable to initialize, exception message: {0}", de.Message);
 throw;
 }
 // Prepare for bulk import.
 // Creating documents with simple partition key here.
 string partitionKeyProperty = dataCollection.PartitionKey.Paths[0].Replace("/", "");
 long numberOfDocumentsToGenerate = long.Parse(ConfigurationManager.AppSettings["NumberOfDocumentsToImport"]);
 int numberOfBatches = int.Parse(ConfigurationManager.AppSettings["NumberOfBatches"]);
 long numberOfDocumentsPerBatch = (long)Math.Floor(((double)numberOfDocumentsToGenerate) / numberOfBatches);
 // Set retry options high for initialization (default values).
 client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
 client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
 IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
 await bulkExecutor.InitializeAsync();
 // Set retries to 0 to pass control to bulk executor.
 client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
 client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
 BulkImportResponse bulkImportResponse = null;
 long totalNumberOfDocumentsInserted = 0;
 double totalRequestUnitsConsumed = 0;
 double totalTimeTakenSec = 0;
 var tokenSource = new CancellationTokenSource();
 var token = tokenSource.Token;
 for (int i = 0; i < numberOfBatches; i++)
 {
 // Generate JSON-serialized documents to import.
 List<string> documentsToImportInBatch = new List<string>();
 long prefix = i * numberOfDocumentsPerBatch;
 Trace.TraceInformation(String.Format("Generating {0} documents to import for batch {1}", numberOfDocumentsPerBatch, i));
 for (int j = 0; j < numberOfDocumentsPerBatch; j++)
 {
 string partitionKeyValue = (prefix + j).ToString();
 string id = partitionKeyValue + Guid.NewGuid().ToString();
 documentsToImportInBatch.Add(Utils.GenerateRandomDocumentString(id, partitionKeyProperty, partitionKeyValue));
 }
 // Invoke bulk import API.
 var tasks = new List<Task>();
 tasks.Add(Task.Run(async () =>
 {
 Trace.TraceInformation(String.Format("Executing bulk import for batch {0}", i));
 do
 {
 try
 {
 bulkImportResponse = await bulkExecutor.BulkImportAsync(
 documents: documentsToImportInBatch,
 enableUpsert: true,
 disableAutomaticIdGeneration: true,
 maxConcurrencyPerPartitionKeyRange: null,
 maxInMemorySortingBatchSize: null,
 cancellationToken: token);
 }
 catch (DocumentClientException de)
 {
 Trace.TraceError("Document client exception: {0}", de);
 break;
 }
 catch (Exception e)
 {
 Trace.TraceError("Exception: {0}", e);
 break;
 }
 } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
 Trace.WriteLine(String.Format("\nSummary for batch {0}:", i));
 Trace.WriteLine("--------------------------------------------------------------------- ");
 Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
 bulkImportResponse.NumberOfDocumentsImported,
 Math.Round(bulkImportResponse.NumberOfDocumentsImported / bulkImportResponse.TotalTimeTaken.TotalSeconds),
 Math.Round(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.TotalTimeTaken.TotalSeconds),
 bulkImportResponse.TotalTimeTaken.TotalSeconds));
 Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
 (bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.NumberOfDocumentsImported)));
 Trace.WriteLine("---------------------------------------------------------------------\n ");
 totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
 totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
 totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
 },
 token));
 /*
 tasks.Add(Task.Run(() =>
 {
 char ch = Console.ReadKey(true).KeyChar;
 if (ch == 'c' || ch == 'C')
 {
 tokenSource.Cancel();
 Trace.WriteLine("\nTask cancellation requested.");
 }
 }));
 */
 await Task.WhenAll(tasks);
 }
 Trace.WriteLine("Overall summary:");
 Trace.WriteLine("--------------------------------------------------------------------- ");
 Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
 totalNumberOfDocumentsInserted,
 Math.Round(totalNumberOfDocumentsInserted / totalTimeTakenSec),
 Math.Round(totalRequestUnitsConsumed / totalTimeTakenSec),
 totalTimeTakenSec));
 Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
 (totalRequestUnitsConsumed / totalNumberOfDocumentsInserted)));
 Trace.WriteLine("--------------------------------------------------------------------- ");
 // Cleanup on finish if set in config.
 if (bool.Parse(ConfigurationManager.AppSettings["ShouldCleanupOnFinish"]))
 {
 Trace.TraceInformation("Deleting Database {0}", DatabaseName);
 await client.DeleteDatabaseAsync(UriFactory.CreateDatabaseUri(DatabaseName));
 }
 Trace.WriteLine("\nPress any key to exit.");
 Console.ReadKey();
 }
 }
}
answered Oct 11, 2018 at 11:54
Sign up to request clarification or add additional context in comments.

5 Comments

I have tried to put it in the ForEach but I had an error, I have to add async tickets.ForEach(async t => {}); and the when I try to do the import I have a new error : "An asynchronous module or handler completed while an asynchronous operation was still pending."
don't make it foreach async leave it as it is. you just use my code as it is and let me know
I get this error then on the await DocumentDBRepository<Ticket>.CreateItemAsync(tview); The 'await' operator can only be used within an async lambda expression. Consider marking this lambda expression with the 'async' modifier.
@Kamil, I'd update my answer and found awesome library to import multiple documents you please go through answer hope it solves your problem :)

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.