本文介绍:
一种通过Azure Functions 同步 blob的方法。
在之前的内容中,我们分享过Azure Functions+azcopy的同步方式,今天我们介绍在Functions中使用blob sdk进行同步,使用该方案,可以将Functions 同时部署到云端或边缘侧。
示例代码:https://github.com/sean8544/azure-blob-sync-by-azure-function-blob-trigger
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
namespace Company.Function
{
public static class BlobTrigger1
{
[FunctionName("BlobTrigger1")]
public static async Task Run([BlobTrigger("%source-container-name%/{name}", Connection = "source-storage-connection-string")]Stream myBlob, string name, ILogger log)
{
log.LogInformation($"C# Blob trigger function Processed blob\n Name:{name} \n Size: {myBlob.Length} Bytes");
string SourceAccountConnString = System.Environment.GetEnvironmentVariable("source-storage-connection-string");
string SourceContainerName= System.Environment.GetEnvironmentVariable("source-container-name");
string TargetAccountConnString = System.Environment.GetEnvironmentVariable("target-storage-connection-string");
string TargetContainerName = System.Environment.GetEnvironmentVariable("target-container-name");
BlobContainerClient SourceContainerClient=new BlobContainerClient(SourceAccountConnString,SourceContainerName);
BlobContainerClient TargetContainerClient=new BlobContainerClient(TargetAccountConnString,TargetContainerName);
await CopyBlobAsync(SourceContainerClient,TargetContainerClient, name,log);
}
private static async Task CopyBlobAsync(BlobContainerClient sourceContainer,BlobContainerClient targetContainer,string blobName,ILogger log)
{
try
{
// Get the name of the first blob in the container to use as the source.
//string blobName = sourceContainer.GetBlobs().FirstOrDefault().Name;
// Create a BlobClient representing the source blob to copy.
BlobClient sourceBlob = sourceContainer.GetBlobClient(blobName);
// Get a BlobClient representing the destination blob.
BlobClient destBlob = targetContainer.GetBlobClient( blobName);
// Ensure that the source blob exists.
if (await sourceBlob.ExistsAsync())
{
// Lease the source blob for the copy operation
// to prevent another client from modifying it.
BlobLeaseClient lease = sourceBlob.GetBlobLeaseClient();
try{
// Specifying -1 for the lease interval creates an infinite lease.
await lease.AcquireAsync(TimeSpan.FromSeconds(-1));
}
catch(Exception ex)
{
log.LogError($"Set lease error:{ex.Message}");
}
// Get the source blob's properties and display the lease state.
BlobProperties sourceProperties = await sourceBlob.GetPropertiesAsync();
log.LogInformation($"Lease state: {sourceProperties.LeaseState}");
//Generate sas uri for the source blob inorder to copy blob from different storage account
var sourceSas = sourceBlob.GenerateSasUri(Azure.Storage.Sas.BlobSasPermissions.Read, DateTime.UtcNow.AddMinutes(-10).AddDays(7));
//StartCopyFromUriAsync 只用在从云端storage 拷贝到 云端 storage的情况,如果从云端 拷贝到 iot edge,则可以使用 先download 后upload的方式
// Start the copy operation. This works only when copy cloud to cloud
await destBlob.StartCopyFromUriAsync(sourceSas);
//uncommon bellow code if you want to sync from cloud to IoT Edge
//await sourceBlob.DownloadToAsync(sourceBlob.Name);
//await destBlob.UploadAsync(sourceBlob.Name);
// Get the destination blob's properties and display the copy status.
BlobProperties destProperties = await destBlob.GetPropertiesAsync();
log.LogInformation($"{blobName} Copy status: {destProperties.CopyStatus},Copy progress: {destProperties.CopyProgress},Completion time: {destProperties.CopyCompletedOn},Total bytes: {destProperties.ContentLength}");
// Update the source blob's properties.
sourceProperties = await sourceBlob.GetPropertiesAsync();
if (sourceProperties.LeaseState == LeaseState.Leased)
{
// Break the lease on the source blob.
await lease.BreakAsync();
// Update the source blob's properties to check the lease state.
sourceProperties = await sourceBlob.GetPropertiesAsync();
log.LogInformation($"{blobName} Lease state: {sourceProperties.LeaseState}");
}
}
}
catch (RequestFailedException ex)
{
log.LogError(ex.Message);
}
}
}
}
其中使用了blob trigger,同时在代码中通过
await destBlob.StartCopyFromUriAsync(sourceSas) 将blob 拷贝到目的地中,其中sourceSas 通过如下代码生成:
var sourceSas = sourceBlob.GenerateSasUri(Azure.Storage.Sas.BlobSasPermissions.Read, DateTime.UtcNow.AddMinutes(-10).AddDays(7));
之所以使用SAS的方式,是因为如果出现跨Storage Account的情况,则必须要SAS才能访问源或者需要将源设定为public访问。
需要注意的是,经测试,如果该Functions 部署到Azure IoT Edge中,且由云端blob 触发拷贝到本地 iot edge 中的 blob时,GenerateSasUri方法不能正常工作,此时可以使用:
//await sourceBlob.DownloadToAsync(sourceBlob.Name);
//await destBlob.UploadAsync(sourceBlob.Name);
先下载到Functions中,然后Upload到IoT Edge中的blob里的方式。
配置文件:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=xxx",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"source-storage-connection-string": "DefaultEndpointsProtocol=https;AccountName=xxx",
"source-container-name":"samples-workitems",
"target-storage-connection-string":"DefaultEndpointsProtocol=https;AccountName=xxx",
"target-container-name":"target"
}
}
演示视频: