本文介绍:

一种通过Azure Functions 同步 blob的方法。

 

在之前的内容中,我们分享过Azure Functions+azcopy的同步方式,今天我们介绍在Functions中使用blob sdk进行同步,使用该方案,可以将Functions 同时部署到云端或边缘侧。

 

示例代码:https://github.com/sean8544/azure-blob-sync-by-azure-function-blob-trigger

 

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;

namespace Company.Function
{
    public static class BlobTrigger1
    {
        [FunctionName("BlobTrigger1")]
        public static async Task Run([BlobTrigger("%source-container-name%/{name}", Connection = "source-storage-connection-string")]Stream myBlob, string name, ILogger log)
        {
            log.LogInformation($"C# Blob trigger function Processed blob\n Name:{name} \n Size: {myBlob.Length} Bytes");

            string SourceAccountConnString = System.Environment.GetEnvironmentVariable("source-storage-connection-string");
            string SourceContainerName= System.Environment.GetEnvironmentVariable("source-container-name");
            string TargetAccountConnString = System.Environment.GetEnvironmentVariable("target-storage-connection-string");
            string TargetContainerName = System.Environment.GetEnvironmentVariable("target-container-name");

            BlobContainerClient SourceContainerClient=new BlobContainerClient(SourceAccountConnString,SourceContainerName);
            BlobContainerClient TargetContainerClient=new BlobContainerClient(TargetAccountConnString,TargetContainerName);

            await CopyBlobAsync(SourceContainerClient,TargetContainerClient, name,log);
            

        }





        private static async Task CopyBlobAsync(BlobContainerClient sourceContainer,BlobContainerClient targetContainer,string blobName,ILogger log)
{
    try
    {
        // Get the name of the first blob in the container to use as the source.
        //string blobName = sourceContainer.GetBlobs().FirstOrDefault().Name;

        // Create a BlobClient representing the source blob to copy.
        BlobClient sourceBlob = sourceContainer.GetBlobClient(blobName);

        // Get a BlobClient representing the destination blob.
        BlobClient destBlob = targetContainer.GetBlobClient( blobName);


        // Ensure that the source blob exists.
        if (await sourceBlob.ExistsAsync())
        {
            // Lease the source blob for the copy operation 
            // to prevent another client from modifying it.
            BlobLeaseClient lease = sourceBlob.GetBlobLeaseClient();


            try{
                // Specifying -1 for the lease interval creates an infinite lease.
                await lease.AcquireAsync(TimeSpan.FromSeconds(-1));
            }
            catch(Exception ex)
            {
                log.LogError($"Set lease error:{ex.Message}");
            }
            

            // Get the source blob's properties and display the lease state.
            BlobProperties sourceProperties = await sourceBlob.GetPropertiesAsync();
            log.LogInformation($"Lease state: {sourceProperties.LeaseState}");

            //Generate  sas uri for the source blob inorder to copy blob from different storage account
            var sourceSas = sourceBlob.GenerateSasUri(Azure.Storage.Sas.BlobSasPermissions.Read, DateTime.UtcNow.AddMinutes(-10).AddDays(7));

           
            //StartCopyFromUriAsync 只用在从云端storage 拷贝到 云端 storage的情况,如果从云端 拷贝到 iot edge,则可以使用 先download 后upload的方式
            // Start the copy operation. This works only when copy cloud to cloud
            await destBlob.StartCopyFromUriAsync(sourceSas);

            //uncommon bellow code if you want to sync from cloud to IoT Edge
            
            //await sourceBlob.DownloadToAsync(sourceBlob.Name);
            //await destBlob.UploadAsync(sourceBlob.Name);

            // Get the destination blob's properties and display the copy status.
            BlobProperties destProperties = await destBlob.GetPropertiesAsync();

            log.LogInformation($"{blobName} Copy status: {destProperties.CopyStatus},Copy progress: {destProperties.CopyProgress},Completion time: {destProperties.CopyCompletedOn},Total bytes: {destProperties.ContentLength}");
           

            // Update the source blob's properties.
            sourceProperties = await sourceBlob.GetPropertiesAsync();

            if (sourceProperties.LeaseState == LeaseState.Leased)
            {
                // Break the lease on the source blob.
                await lease.BreakAsync();

                // Update the source blob's properties to check the lease state.
                sourceProperties = await sourceBlob.GetPropertiesAsync();
                log.LogInformation($"{blobName} Lease state: {sourceProperties.LeaseState}");
            }
        }
    }
    catch (RequestFailedException ex)
    {
        log.LogError(ex.Message);
        
    }
}
    }
}

 

其中使用了blob trigger,同时在代码中通过 

await destBlob.StartCopyFromUriAsync(sourceSas) 将blob 拷贝到目的地中,其中sourceSas 通过如下代码生成:
var sourceSas = sourceBlob.GenerateSasUri(Azure.Storage.Sas.BlobSasPermissions.Read, DateTime.UtcNow.AddMinutes(-10).AddDays(7));
 
之所以使用SAS的方式,是因为如果出现跨Storage Account的情况,则必须要SAS才能访问源或者需要将源设定为public访问。
 
需要注意的是,经测试,如果该Functions 部署到Azure IoT Edge中,且由云端blob 触发拷贝到本地 iot edge 中的 blob时,GenerateSasUri方法不能正常工作,此时可以使用:
//await sourceBlob.DownloadToAsync(sourceBlob.Name);
//await destBlob.UploadAsync(sourceBlob.Name);
先下载到Functions中,然后Upload到IoT Edge中的blob里的方式。
 
 
配置文件:
{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=xxx",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "source-storage-connection-string": "DefaultEndpointsProtocol=https;AccountName=xxx",
    "source-container-name":"samples-workitems",
    "target-storage-connection-string":"DefaultEndpointsProtocol=https;AccountName=xxx",
    "target-container-name":"target"
  }
}
 
 
 
演示视频: