Have you heard about "Big Data" analytics, ETL, Data Warehouses and been confused about how to use it?
Have you ever wondered how "AWS S3" might be synchronizing your files with "Google Cloud Storage" in the background?
In this 20 minute tutorial, we'll walk through building a Node.js Lambda function in AWS to transfer data between accounts. We will create a Lambda function which listens for new files added to the source S3 bucket and then triggers a process which copies them to Google Storage.
Part II of this series provides a tutorial on building a Data Warehouse In the Cloud using Google BigQuery. It provides a step-by-step guide to building a fast Big Data Warehouse solution including load monitoring and error handling.
You are here >>> Part 1: Sync only new/changed files from AWS S3 to GCP cloud storage.
Part 2: Monitor the data transfer and data load into BigQuery.
Part 3: Create tables and scheduling ETL scripts.
Finally, we'll need Node.js v8.6 or above to run our core application.
If you don't have any of these just create it. It's Free.
Before starting, let's think about what we need to do here. We will create a lambda function to transfer files from S3 to Google Storage which does the following:
Sounds simple.
We will be using AWS Node.js SDK in order to copy files from S3. You can find usage guides for S3 SDK here - docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html.
We will also need to create a Google service account in order to use Google account credentials. This page contains the official instructions for Google managed accounts: How to create service account. After this you will be able to download your credentials and use Google API programmatically with Node.js. Your credentials file will look like this (example):
./your-google-project-12345.json
{
"type": "service_account",
"project_id": "your-project",
"private_key_id": "1202355D6DFGHJK54A223423DAD34320F5435D",
"private_key": "-----BEGIN PRIVATE KEY-----\ASDFGHJKLWQERTYUILKJHGFOIUYTREQWEQWE4BJDM61T4S0WW3XKXHIGRV\NRTPRDALWER3/H94KHCKCD3TEJBRWER4CX9RYCIT1BY7RDBBVBCTWERTYFGHCVBNMLKXCVBNM,MRaSIo=\n-----END PRIVATE KEY-----\n",
"client_email": "your-project-adminsdk@your-project.iam.gserviceaccount.com",
"client_id": "1234567890",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/your-project-adminsdk%40your-project.iam.gserviceaccount.com"
}
Run the following commands to clone the starter project and install the dependencies
git clone https://github.com/mshakhomirov/bigquery-etl-tutorial.git
cd bigquery-etl-tutorial
npm install
The starter branch includes a base directory template, with dependencies declared in package.json.
Your lambda's package.json file should look like that:
{"name": "gcs-transfer",
"version": "0.0.1",
"private": true,
"scripts": {},
"dependencies": {
"@google-cloud/storage": "^3.3.0",
"aws-sdk": "^2.512.0",
"request": "^2.81.0"
},
"description": "Microservice lambda to copy files from S3 to Google cloud storage.",
"main": "index.js",
"devDependencies": {},
"author": "Mike Shakhomirov - mshakhomirov@gmail.com",
"license": "ISC"
}
After you run npm install command all required dependencies will be installed.
Let's assume we have files in different S3 buckets and as soon as new files are there we'd like them to be copied to Google Storage.
Let's create our lambda function file called:
./index.js
'use strict';
const AWS = require('aws-sdk');
const async = require('async');
const { Storage } = require('@google-cloud/storage');
AWS.config.update({region: "eu-west-1"});
let s3 = new AWS.S3();
const moment = require('moment');
let config = require('./config.json');
exports.handler = async (event, context) => {
let tables = config.Tables;
for (const table of tables) {
if (event.Records[0].s3.object.key.indexOf(table.name) > 0) {
console.log('Loading into a table ',table.name);
console.log('from: ',event.Records[0].s3.object.key);
try {
let data = await processEvent(event.Records);
context.succeed(data);
} catch (e) {
console.log(e);
context.done(e);
}
}
};
};
let processEvent = async (Records) => {
let srcBucket = Records[0].s3.bucket.name;
let srcKey = decodeURIComponent(Records[0].s3.object.key.replace(/\+/g, " "));
let data = await s3.getObject({Bucket: srcBucket, Key: srcKey}).promise();
let googleCreds = await getGoogleCredentials();
try {
await copyToGoogleStorage(data.Body, srcKey, googleCreds);
} catch (error) {
console.log(error);
};
return 'Success' ;
};
let getGoogleCredentials = async () => {
let creds = await s3.getObject({Bucket: config.cred_bucket, Key: config.cred_s3_obj}).promise();
var chunk = creds.Body.toString();
let googleKey = JSON.parse(chunk).private_key.split("\\n");
return googleKey;
}
;
let copyToGoogleStorage = async (data,destination, googleCreds) => {
//Create GCS object
const storage = new Storage({
projectId: config.gcp_project_id,
credentials: {
client_email: config.gcp_client_email,
private_key: googleCreds[0]
}
});
let Bucket = {};
try {
Bucket = await storage.bucket(config.googleBucket);
const options = {
metadata:{
contentType: data.ContentType
}
};
await Bucket.file(destination).save(data, options);
console.log('File written successfully:', destination);
} catch (error) {
console.log(error);
}
}
;
Have a look at getGoogleCredentials function inside. We keep our Google credentials file your-google-project-12345.json in S3 bucket. It's a good security practice to do so.
We have also created let config = require('./config.json'); where we will keep our settings.
For example, my settings look like this:
{
"Tables": [
{
"name" : "daily_table1"
},
{
"name" : "daily_table2"
},
{
"name" : "daily_table3"
},
{
"name" : "daily_table4"
}
],
"cred_bucket": "googleCreds.my_project.aws",
"cred_s3_obj": "my-pgoogle-project-12345a.json",
"googleBucket": "your_google_destination_bucket",
"gcp_project_id": "your-google-project-name",
"gcp_client_email": "your-google-project-adminsdk@your-google-project.iam.gserviceaccount.com"
}
You can see table names there. Essentially these are file names prefixes so you will need to adjust your test files to reflect it accordingly. For example, filename in your source bucket must contain one of those names in config tables, e.g. s3://your-source-bucket/daily_table1-2019-10-01.json
You'll also need to change the "cred_bucket" entry to match your S3 bucket where you store your credentials. Change other variables to reflect your project too.
Now let's test our lambda locally. You might notice that we're using test.js file in our project.
./test.js:
"use strict";
let lambda = require('./index');
let event = require('./event.json');
lambda.handler(event,
{
done : r => console.log("done, result returned:\n", r),
succeed : r => console.log("success, result returned:\n", r),
fail : e => console.log(e)
}
);
You can see that we declared ./event.json file to simulate AWS S3 bucket Create object event. This file should have these contents:
./event.json:
{
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "eu-west-1",
"eventTime": "2017-04-11T09:29:02.255Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "AWS:13123D54VVQXA:some-lambda"
},
"requestParameters": {
"sourceIPAddress": "12.22.333.105"
},
"responseElements": {
"x-amz-request-id": "12323456",
"x-amz-id-2": "QWERTYUIOPASDFGHJKLZXCVBNMFBA="
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "easdas49-ff4e-41c2-bf68-4b4b67657d0e",
"bucket": {
"name": "yourBucket.withFilesToTransfer.aws",
"ownerIdentity": {
"principalId": "QWERTY555"
},
"arn": "arn:aws:s3:::yourBucket.withFilesToTransfer.aws"
},
"object": {
"key": "folder1/folder2/2019/10/06/13/your-file-to-transfer-2-9c380b365d9b",
"size": 247,
"eTag": "146345662a47f28aede3fdeaa95e7",
"versionId": "SDFGHJKL45678",
"sequencer": "005DFGHJKE3940352C"
}
}
}
]
}
All we really care about is the name of the bucket and object key here. So change it to reflect your S3 test files accordingly.
Now run node test in your command line. It will trigger your local lambda which will use the file in S3 you specified in event object.key:
(base) Mikes-MBP:s3-to-gcs mikeshakhomirov$ node test
index.handler invoked with event { Records:
[ { eventVersion: '2.0',
eventSource: 'aws:s3',
awsRegion: 'eu-west-1',
eventTime: '2017-04-11T09:29:02.255Z',
eventName: 'ObjectCreated:Put',
userIdentity: [Object],
requestParameters: [Object],
responseElements: [Object],
s3: [Object] } ] }
Loading into a table my-file-to-transfer-production
from: my-file-to-transfer/2019/10/06/13/my-file-to-transfer-production-2-2019-10-06-13-21-26-d9b
success, result returned:
Buffer 7b 22 67 72 6f 75 70 49 64 22 3a 31 37 37 30 39 35 39 31 2c 22 63 6f 6e 73 75 6d 65 72 43 6f 75 6e 74 22 3a 31 2c 22 62 72 6f 61 64 63 61 73 74 65 72 ... >
File written successfully.
(base) Mikes-MBP:s3-to-gcs mikeshakhomirov$
Now if you go to Google Cloud console you should be able to see a new file in Storage.
Let's deploy the lambda to AWS and add S3 event trigger to invoke it.
We will use a shell script for this. Create a file ./deploy.sh.
./deploy.sh:
#!/usr/bin/env bash
# Add permissions to your file
# Just google how to do it if you are using Windows
# chmod +x the_file_name
zp="lambda_archive.zip"
echo $zp
rm -f $zp
zip -r $zp * -x deploy.sh
if ! aws lambda create-function \
--function-name gcs-transfer \
--description "Sync files from Datalake to Google Cloud Storage" \
--handler index.handler \
--runtime nodejs8.10 \
--role arn:aws:iam::12345693471:role/gcs-transfer_lambda \
--zip-file fileb://$zp;
then
echo ""
echo "Function already exists, updating instead..."
aws lambda update-function-code \
--function-name gcs-transfer \
--zip-file fileb://$zp;
fi
You can see that we are using a role role arn:aws:iam::12345693471:role/gcs-transfer_lambda here.
This role is required by AWS security rules so your lambda could access files in S3.
This article explaines how to create a role read here: Execution role
Great! Now let's run ./deploy.sh. The output will be some json confirming that lambda has been successfully deployed or updated.
Cool! Now we have a totally useless Lambda which is supposed to copy files between accounts. Now let's add a trigger. Go to AWS Console -> Lambda -> Functions -> your function -> Configuration and add S3 trigger with source bucket name.
Now try uploading some files to your S3 source bucket. Our lambda will pick it up and copy it. You should see the same file in Google Cloud destination bucket.
We have just created a lambda to sync our AWS S3 cloud with Google Storage, tested it locally and created a shell script to deploy and update it. Great, we now have an AWS Lambda function which transfers the files between accounts. Time to add some ETL scripts to load data into BigQuery.
In Part 2 we will see how to load different file formats into BigQuery, do load monitoring and error handling and will create a few tables.
Thanks for reading!
Comments