当前位置:网站首页>Migrate from MySQL database to AWS dynamodb
Migrate from MySQL database to AWS dynamodb
2022-04-23 04:33:00 【Heavy dust】
GO Language —— from MySQL Database migration to Amazon DynamoDB
Catalog
One 、 Preface
Bloggers are learning Go Language , Happen Leader It is required to study whether it can be implemented by script MySQL Database to DynamoDB Migration
This blog post is used by bloggers Go A simple migration of language implementation , Because I'm a beginner Go Language , The code is not very beautiful , There is no algorithm optimization .
The function , It's just a simple one-to-one migration of data to DynamoDB. And the original MySQL Database type , stay DynamoDB All of them are for String type .
We will continue to optimize the program later , Welcome, too Go The language bosses put forward valuable modification and optimization schemes .
1.1 Migration background
Many companies consider starting from MySQL Wait for relational databases to migrate to Amazon DynamoDB
Amazon DynamoDB It's a fully managed 、 Fast 、 Highly scalable and flexible NoSQL database .DynamoDB The capacity can be increased or reduced according to the traffic according to the business needs . With typical media based RDBMS comparison , You can more easily optimize the total cost of services
1.2 Migration issues
- Service interruption due to downtime , Especially when customer service must 24/7/365 Seamless when available
- RDBMS and DynamoDB Different key designs
1.3 AWS Official migration method
Two methods based on AWS Migration of managed services :https://aws.amazon.com/cn/blogs/big-data/near-zero-downtime-migration-from-mysql-to-dynamodb/
Learning video :https://www.youtube.com/watch?v=j88icq7JArI
Two 、 Ideas and functions
2.1 Ideas
-
Initial database connection and DynamoDB client
-
Read MySQL database
-
Get the data in each database MySQL Data sheet
-
Change the data table structure to DynamoDB structure ( Field , type )
- Get the table field and judge whether the table field is a primary key
-
establish DynamoDB surface :
-
Definition DynamoDB The name of the table :mysql Database name _ Data sheet
-
establish DynamoDB surface
-
-
Cycle to get MySQL Data from data sheets , Load into DynamoDB
-
Get all column data information , And the number of lines
-
Read data from data table
-
Write data to DynamoDB
2.2 The main function
main The function is to call other functions
Initialize database connection , And initialize Amazon DynamoDB client
// Initialize database connection
db := Mysql.ConnectDB()
if db == nil {
fmt.Printf("init db failed!\n")
return
}
// initial DynamoDB client
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("ap-southeast-1"))
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
// Using the Config value, create the DynamoDB client
svc := dynamodb.NewFromConfig(cfg)
Function call
// 1 Read database
Mysql.DatabaseInfo(db)
// 2 Read the data table
Mysql.TableInfo(db, database[i])
// 3 Get table fields
Mysql.TableFiledInfo(db, database[i], table[j])
// 4 establish DynamoDB surface
DynamoDB.CreateDynamoDB(svc, field, tableName)
// 5.1 Get all column data information , And the number of lines
Mysql.TableData(db, field, database[i], table[j])
// 5.3 Write data to DynamoDB
DynamoDB.PutItemDynamoDB(svc , itemMap, tableName)
3、 ... and 、MySQL function
3.1 Query all non system databases
stay MySQL In the database ,INFORMATION_SCHEMA.TABLES The table stores MySQL Metadata of database .
Metadata information mainly includes table information and table field information in the database , It can be downloaded from INFORMATION_SCHEMA.TABLES Query database information in the table :
SELECT table_schema databaseName
FROM INFORMATION_SCHEMA.TABLES
WHERE UPPER(table_type)='BASE TABLE'
AND table_schema NOT IN ('mysql','performance_schema','sys')
GROUP BY table_schema
ORDER BY table_schema asc
UPPER(table_type)='BASE TABLE' Select only the basic database , stay MySQL In the database , Artificially created databases are BASE TABLE type .
stay MySQL In the database , database mysql,performance_schema,sys All belong to BASE TABLE type . But these three databases are also MySQL The database that comes with it , Not a user database , Need to be excluded .
func DatabaseInfo(db *sql.DB) []string {
sqlStr := `SELECT table_schema databaseName FROM INFORMATION_SCHEMA.TABLES WHERE UPPER(table_type)='BASE TABLE' AND table_schema NOT IN ('mysql','performance_schema','sys') GROUP BY table_schema ORDER BY table_schema asc`
rows, err := db.Query(sqlStr)
// Close the query
defer rows.Close()
if err != nil {
fmt.Printf("query table name error!err:%v\n", err)
return nil
//panic(err)
}
var result []string
for rows.Next() {
var tableName string
err = rows.Scan(&tableName)
if err != nil {
fmt.Printf("scan table name error!err:%v\n", err)
return nil
}
result = append(result, tableName)
}
return result
}
Traverse query results , The database name is stored as an array .
3.2 Query data table information
All table structures in the database , from INFORMATION_SCHEMA.TABLES Query in table . The code logic is consistent with the query database information
sqlStr Query statement ,? Pass in the database name
sqlStr := `SELECT table_name tableName FROM INFORMATION_SCHEMA.TABLES WHERE UPPER(table_type)='BASE TABLE' AND LOWER(table_schema) = ? GROUP BY table_name ORDER BY table_name asc`
Traverse query results , The data table name is stored as an array .
3.3 Query data table field information
All field structures in the data table , from INFORMATION_SCHEMA.TABLES Query in table . The code logic is consistent with the query database information
Data table field structure :
Fname: Table field nameColumnKey: Whether the table field attribute is a primary key (PRI)dataType: Table field type
type Field struct {
Fname string
ColumnKey string
dataType string
}
sqlStr Query statement ,? Pass in database and data table names
sqlStr := `SELECT COLUMN_NAME fName,COLUMN_KEY columnKey,DATA_TYPE dataType FROM information_schema.columns WHERE table_schema = ? AND table_name = ?`
Function returns a table field structure
3.4 Look up all the information in the table
stay go Language native github.com/go-sql-driver/mysql Query in , You need to specify the same number of variables as the query result to output the query result .
Official documentation :https://pkg.go.dev/database/sql#Row.Scan
Therefore, the table fields obtained by the blogger through the previous function , Query fields one by one , Summarize into a query structure by column map Type data .
- Traverse the field array , adopt
QueryQuery out , The value of this column in the data table - utilize
rows.NextTraverse query results ,rows.ScanGet column values , Append to array - Use map type , With key( Field name ):value[ An array of values ] Store all the data of a table
func TableData(db *sql.DB,field []Field,database, table string) (map[string][]string, int) {
result := make(map[string][]string)
var rowsLength int
for i := 0; i < len(field); i++ {
sqlStr := "SELECT " + field[i].Fname + " from " + database + "." + table
//fmt.Println(sqlStr)
rows, err := db.Query(sqlStr)
if err != nil {
fmt.Printf("Failed to query table! error: %v\n", err)
return nil, 0
}
defer rows.Close()
var columnValue string
var oneResult []string
for rows.Next() {
err = rows.Scan(&columnValue)
if err != nil {
fmt.Printf("Failed to scan a field in a table!err:%v\n", err)
return nil, 0
}
oneResult = append(oneResult, columnValue)
}
if len(oneResult) == 0 {
fmt.Printf("%v.%v not data!\n", database, table)
return nil, 0
}
result[field[i].Fname] = oneResult
rowsLength = len(oneResult)
}
return result, rowsLength
}
-
db: *sql.DB Database connection
-
field: []Field Data table fields
-
database: string database
-
table: string Data sheet
-
return: map[string][]string Return the data of each column
Four 、Amazon DynamoDB
4.1 establish DynamoDB surface
stay DynamoDB In the design of , There is only one partition key and one sort key . Of course Amazon DynamoDB in , You can also add global and local indexes , This way is complicated , Here we just use the partition key and sort key
because DynamoDB There are only two keys , And you must specify a partition key .
And in the MySQL There may be more than two or no primary keys in the database , In the face of these two situations , The blogger judges the field attribute value obtained earlier .
If there are two primary keys , Take the two primary keys in front of the query results as partition keys and sorting keys respectively ; If there is no primary key , Take the first column of the query result as the partition key .
The best way here is to write an interface , Modify each data table according to actual production and convert it into DynamoDB After the format
In addition, create a table , The default is String type , Did not determine the format of the original field . Bloggers understand that the best way is to use Go The reflection mechanism of language to judge the transformation DynamoDB Field type
func CreateDynamoDB(svc *dynamodb.Client, field [] Mysql.Field, tableName string) *dynamodb.CreateTableOutput {
var attributeDefinitions []types.AttributeDefinition
var keySchema []types.KeySchemaElement
for i :=0; i < len(field); i++ {
if (field[i].ColumnKey == "PRI") && (len(attributeDefinitions) < 1) {
// The first primary key is used as the partition key
Attribute := []types.AttributeDefinition{
{
AttributeName: aws.String(field[i].Fname),
AttributeType: types.ScalarAttributeTypeS,
},
}
schemaElement := []types.KeySchemaElement{
{
AttributeName: aws.String(field[i].Fname),
KeyType: types.KeyTypeHash,
},
}
attributeDefinitions = append(attributeDefinitions, Attribute...)
keySchema = append(keySchema, schemaElement...)
} else if (field[i].ColumnKey == "PRI") && (len(attributeDefinitions) >= 1) {
// The second primary key is used as the sort key
Attribute := []types.AttributeDefinition{
{
AttributeName: aws.String(field[i].Fname),
AttributeType: types.ScalarAttributeTypeS,
},
}
schemaElement := []types.KeySchemaElement{
{
AttributeName: aws.String(field[i].Fname),
KeyType: types.KeyTypeRange,
},
}
attributeDefinitions = append(attributeDefinitions, Attribute...)
keySchema = append(keySchema, schemaElement...)
}
// When there are more than two primary keys , Select only the first two primary keys
if len(attributeDefinitions) >= 2 {
fmt.Printf("The database primary key is greater than or equal to 2!tableName:%v\n", tableName)
break
}
}
// If there is no primary key , Take the first table field as DynamoDB The partition key
if len(attributeDefinitions) == 0 {
attributeDefinitions = []types.AttributeDefinition{
{
AttributeName: aws.String(field[0].Fname),
AttributeType: types.ScalarAttributeTypeS,
},
}
keySchema = []types.KeySchemaElement{
{
AttributeName: aws.String(field[0].Fname),
KeyType: types.KeyTypeHash,
},
}
fmt.Printf("No primary key exists in the database!tableName:%v\n", tableName)
}
//fmt.Println(attributeDefinitions[1].AttributeName)
input := &dynamodb.CreateTableInput{
AttributeDefinitions: attributeDefinitions,
KeySchema: keySchema,
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(5),
WriteCapacityUnits: aws.Int64(5),
},
TableName: aws.String(tableName),
}
result, err := svc.CreateTable(context.TODO(),input)
if err != nil {
fmt.Printf("Failed to create DynamoDB! error: %v\n", err)
return nil
}
// CreateTable Asynchronous operation , You have to wait a certain amount of time , Move on to the next step
time.Sleep(time.Second * 5)
return result
}
4.2 insert data
Get through traversal MySQL All data in the data sheet , Add data to the meeting DynamoDB Format map in , call PutItemInput Interface to add data
The added data types are String type , Did not determine the format of the original field . Bloggers understand that the best way is to use Go The reflection mechanism of language to judge the transformation DynamoDB Field type
for k := 0; k < rowLength; k++ {
itemMap := make(map[string]types.AttributeValue)
for itemName, item := range tableData {
itemMap[itemName] = &types.AttributeValueMemberS{
Value: item[k]}
}
// 5.3 Write data to DynamoDB
putItemReuslt := DynamoDB.PutItemDynamoDB(svc , itemMap, tableName)
if putItemReuslt != nil {
fmt.Println("put Item succeed!")
} else {
panic(putItemReuslt)
}
}
func PutItemDynamoDB(svc *dynamodb.Client, itemMap map[string]types.AttributeValue, tableName string) *dynamodb.PutItemOutput{
input := &dynamodb.PutItemInput{
Item: itemMap,
ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
TableName: aws.String(tableName),
}
result, err := svc.PutItem(context.TODO(),input)
if err != nil {
fmt.Printf("Failed to put Item! error: %v\n", err)
return nil
}
return result
}
appendix :
Reference resources
MySQL Driver:https://github.com/Go-SQL-Driver/MySQL/
AWS Go DynamoDB SDKv2:https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/dynamodb#Client.PutItem
Code address
GitHub:https://github.com/MoGD2018/Go-mysql-convert-to-dynamodb
Gitee:https://gitee.com/MoGD/Go-mysql-convert-to-dynamodb
版权声明
本文为[Heavy dust]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230413460365.html
边栏推荐
- TreeSet课后练习
- Why recommend you to study embedded
- 1个需求的一生,团队协作在云效钉钉小程序上可以这么玩
- Mysql---数据读写分离、多实例
- Express middleware ② (classification of Middleware)
- [BIM introduction practice] wall hierarchy and FAQ in Revit
- HMS Core Discovery第14期回顾长文|纵享丝滑剪辑,释放视频创作力
- 【测绘程序设计】坐标方位角推算神器(C#版)
- [echart] Introduction to echart
- PHP export excel table
猜你喜欢

Set classic topics

【BIM+GIS】ArcGIS Pro2. 8 how to open Revit model, Bim and GIS integration?
![[mapping program design] coordinate inverse artifact v1 0 (with C / C / VB source program)](/img/12/de3b2c6ea98be57a8abe2790debfb5.png)
[mapping program design] coordinate inverse artifact v1 0 (with C / C / VB source program)

win10, mysql-8.0.26-winx64.zip 安装

Bridge between ischemic stroke and intestinal flora: short chain fatty acids

MATLAB lit plusieurs diagrammes fig et les combine en un seul diagramme (sous forme de sous - Diagramme)

Chapter 4 - understanding standard equipment documents, filters and pipelines

229. 求众数 II

Apache Bench(ab 压力测试工具)的安装与使用

为什么推荐你学嵌入式
随机推荐
Network principle | connection management mechanism in TCP / IP important protocol and core mechanism
shell wc (统计字符数量)的基本使用
What is the thirty-six plan
Bridge between ischemic stroke and intestinal flora: short chain fatty acids
383. 赎金信
SQL statement for adding columns in MySQL table
Alibaba cloud IOT transfer to PostgreSQL database scheme
[AI vision · quick review of robot papers today, issue 32] wed, 20 APR 2022
A heavy sword without a blade is a great skill
TreeSet课后练习
第四章 --- 了解标准设备文件、过滤器和管道
TreeSet after class exercises
Microbial neuroimmune axis -- the hope of prevention and treatment of cardiovascular diseases
STM32 upper μ C / shell transplantation and Application
单极性非归零NRZ码、双极性非归零NRZ码、2ASK、2FSK、2PSK、2DPSK及MATLAB仿真
阿里云IoT流转到postgresql数据库方案
洛谷P1858 【多人背包】 (背包求前k优解)
Interaction of diet gut microbiota on cardiovascular disease
【Echart】echart 入門
顺序表的基本操作