当前位置:网站首页>Migrate from MySQL database to AWS dynamodb
Migrate from MySQL database to AWS dynamodb
2022-04-23 04:33:00 【Heavy dust】
GO Language —— from MySQL Database migration to Amazon DynamoDB
Catalog
One 、 Preface
Bloggers are learning Go Language , Happen Leader It is required to study whether it can be implemented by script MySQL Database to DynamoDB Migration
This blog post is used by bloggers Go A simple migration of language implementation , Because I'm a beginner Go Language , The code is not very beautiful , There is no algorithm optimization .
The function , It's just a simple one-to-one migration of data to DynamoDB. And the original MySQL Database type , stay DynamoDB All of them are for String type .
We will continue to optimize the program later , Welcome, too Go The language bosses put forward valuable modification and optimization schemes .
1.1 Migration background
Many companies consider starting from MySQL Wait for relational databases to migrate to Amazon DynamoDB
Amazon DynamoDB It's a fully managed 、 Fast 、 Highly scalable and flexible NoSQL database .DynamoDB The capacity can be increased or reduced according to the traffic according to the business needs . With typical media based RDBMS comparison , You can more easily optimize the total cost of services
1.2 Migration issues
- Service interruption due to downtime , Especially when customer service must 24/7/365 Seamless when available
- RDBMS and DynamoDB Different key designs
1.3 AWS Official migration method
Two methods based on AWS Migration of managed services :https://aws.amazon.com/cn/blogs/big-data/near-zero-downtime-migration-from-mysql-to-dynamodb/
Learning video :https://www.youtube.com/watch?v=j88icq7JArI
Two 、 Ideas and functions
2.1 Ideas
-
Initial database connection and DynamoDB client
-
Read MySQL database
-
Get the data in each database MySQL Data sheet
-
Change the data table structure to DynamoDB structure ( Field , type )
- Get the table field and judge whether the table field is a primary key
-
establish DynamoDB surface :
-
Definition DynamoDB The name of the table :mysql Database name _ Data sheet
-
establish DynamoDB surface
-
-
Cycle to get MySQL Data from data sheets , Load into DynamoDB
-
Get all column data information , And the number of lines
-
Read data from data table
-
Write data to DynamoDB
2.2 The main function
main
The function is to call other functions
Initialize database connection , And initialize Amazon DynamoDB client
// Initialize database connection
db := Mysql.ConnectDB()
if db == nil {
fmt.Printf("init db failed!\n")
return
}
// initial DynamoDB client
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("ap-southeast-1"))
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
// Using the Config value, create the DynamoDB client
svc := dynamodb.NewFromConfig(cfg)
Function call
// 1 Read database
Mysql.DatabaseInfo(db)
// 2 Read the data table
Mysql.TableInfo(db, database[i])
// 3 Get table fields
Mysql.TableFiledInfo(db, database[i], table[j])
// 4 establish DynamoDB surface
DynamoDB.CreateDynamoDB(svc, field, tableName)
// 5.1 Get all column data information , And the number of lines
Mysql.TableData(db, field, database[i], table[j])
// 5.3 Write data to DynamoDB
DynamoDB.PutItemDynamoDB(svc , itemMap, tableName)
3、 ... and 、MySQL function
3.1 Query all non system databases
stay MySQL In the database ,INFORMATION_SCHEMA.TABLES
The table stores MySQL Metadata of database .
Metadata information mainly includes table information and table field information in the database , It can be downloaded from INFORMATION_SCHEMA.TABLES
Query database information in the table :
SELECT table_schema databaseName
FROM INFORMATION_SCHEMA.TABLES
WHERE UPPER(table_type)='BASE TABLE'
AND table_schema NOT IN ('mysql','performance_schema','sys')
GROUP BY table_schema
ORDER BY table_schema asc
UPPER(table_type)='BASE TABLE'
Select only the basic database , stay MySQL In the database , Artificially created databases are BASE TABLE
type .
stay MySQL In the database , database mysql,performance_schema,sys
All belong to BASE TABLE
type . But these three databases are also MySQL The database that comes with it , Not a user database , Need to be excluded .
func DatabaseInfo(db *sql.DB) []string {
sqlStr := `SELECT table_schema databaseName FROM INFORMATION_SCHEMA.TABLES WHERE UPPER(table_type)='BASE TABLE' AND table_schema NOT IN ('mysql','performance_schema','sys') GROUP BY table_schema ORDER BY table_schema asc`
rows, err := db.Query(sqlStr)
// Close the query
defer rows.Close()
if err != nil {
fmt.Printf("query table name error!err:%v\n", err)
return nil
//panic(err)
}
var result []string
for rows.Next() {
var tableName string
err = rows.Scan(&tableName)
if err != nil {
fmt.Printf("scan table name error!err:%v\n", err)
return nil
}
result = append(result, tableName)
}
return result
}
Traverse query results , The database name is stored as an array .
3.2 Query data table information
All table structures in the database , from INFORMATION_SCHEMA.TABLES
Query in table . The code logic is consistent with the query database information
sqlStr
Query statement ,?
Pass in the database name
sqlStr := `SELECT table_name tableName FROM INFORMATION_SCHEMA.TABLES WHERE UPPER(table_type)='BASE TABLE' AND LOWER(table_schema) = ? GROUP BY table_name ORDER BY table_name asc`
Traverse query results , The data table name is stored as an array .
3.3 Query data table field information
All field structures in the data table , from INFORMATION_SCHEMA.TABLES
Query in table . The code logic is consistent with the query database information
Data table field structure :
Fname
: Table field nameColumnKey
: Whether the table field attribute is a primary key (PRI)dataType
: Table field type
type Field struct {
Fname string
ColumnKey string
dataType string
}
sqlStr
Query statement ,?
Pass in database and data table names
sqlStr := `SELECT COLUMN_NAME fName,COLUMN_KEY columnKey,DATA_TYPE dataType FROM information_schema.columns WHERE table_schema = ? AND table_name = ?`
Function returns a table field structure
3.4 Look up all the information in the table
stay go Language native github.com/go-sql-driver/mysql
Query in , You need to specify the same number of variables as the query result to output the query result .
Official documentation :https://pkg.go.dev/database/sql#Row.Scan
Therefore, the table fields obtained by the blogger through the previous function , Query fields one by one , Summarize into a query structure by column map Type data .
- Traverse the field array , adopt
Query
Query out , The value of this column in the data table - utilize
rows.Next
Traverse query results ,rows.Scan
Get column values , Append to array - Use map type , With key( Field name ):value[ An array of values ] Store all the data of a table
func TableData(db *sql.DB,field []Field,database, table string) (map[string][]string, int) {
result := make(map[string][]string)
var rowsLength int
for i := 0; i < len(field); i++ {
sqlStr := "SELECT " + field[i].Fname + " from " + database + "." + table
//fmt.Println(sqlStr)
rows, err := db.Query(sqlStr)
if err != nil {
fmt.Printf("Failed to query table! error: %v\n", err)
return nil, 0
}
defer rows.Close()
var columnValue string
var oneResult []string
for rows.Next() {
err = rows.Scan(&columnValue)
if err != nil {
fmt.Printf("Failed to scan a field in a table!err:%v\n", err)
return nil, 0
}
oneResult = append(oneResult, columnValue)
}
if len(oneResult) == 0 {
fmt.Printf("%v.%v not data!\n", database, table)
return nil, 0
}
result[field[i].Fname] = oneResult
rowsLength = len(oneResult)
}
return result, rowsLength
}
-
db: *sql.DB Database connection
-
field: []Field Data table fields
-
database: string database
-
table: string Data sheet
-
return: map[string][]string Return the data of each column
Four 、Amazon DynamoDB
4.1 establish DynamoDB surface
stay DynamoDB In the design of , There is only one partition key and one sort key . Of course Amazon DynamoDB in , You can also add global and local indexes , This way is complicated , Here we just use the partition key and sort key
because DynamoDB There are only two keys , And you must specify a partition key .
And in the MySQL There may be more than two or no primary keys in the database , In the face of these two situations , The blogger judges the field attribute value obtained earlier .
If there are two primary keys , Take the two primary keys in front of the query results as partition keys and sorting keys respectively ; If there is no primary key , Take the first column of the query result as the partition key .
The best way here is to write an interface , Modify each data table according to actual production and convert it into DynamoDB After the format
In addition, create a table , The default is String type , Did not determine the format of the original field . Bloggers understand that the best way is to use Go The reflection mechanism of language to judge the transformation DynamoDB Field type
func CreateDynamoDB(svc *dynamodb.Client, field [] Mysql.Field, tableName string) *dynamodb.CreateTableOutput {
var attributeDefinitions []types.AttributeDefinition
var keySchema []types.KeySchemaElement
for i :=0; i < len(field); i++ {
if (field[i].ColumnKey == "PRI") && (len(attributeDefinitions) < 1) {
// The first primary key is used as the partition key
Attribute := []types.AttributeDefinition{
{
AttributeName: aws.String(field[i].Fname),
AttributeType: types.ScalarAttributeTypeS,
},
}
schemaElement := []types.KeySchemaElement{
{
AttributeName: aws.String(field[i].Fname),
KeyType: types.KeyTypeHash,
},
}
attributeDefinitions = append(attributeDefinitions, Attribute...)
keySchema = append(keySchema, schemaElement...)
} else if (field[i].ColumnKey == "PRI") && (len(attributeDefinitions) >= 1) {
// The second primary key is used as the sort key
Attribute := []types.AttributeDefinition{
{
AttributeName: aws.String(field[i].Fname),
AttributeType: types.ScalarAttributeTypeS,
},
}
schemaElement := []types.KeySchemaElement{
{
AttributeName: aws.String(field[i].Fname),
KeyType: types.KeyTypeRange,
},
}
attributeDefinitions = append(attributeDefinitions, Attribute...)
keySchema = append(keySchema, schemaElement...)
}
// When there are more than two primary keys , Select only the first two primary keys
if len(attributeDefinitions) >= 2 {
fmt.Printf("The database primary key is greater than or equal to 2!tableName:%v\n", tableName)
break
}
}
// If there is no primary key , Take the first table field as DynamoDB The partition key
if len(attributeDefinitions) == 0 {
attributeDefinitions = []types.AttributeDefinition{
{
AttributeName: aws.String(field[0].Fname),
AttributeType: types.ScalarAttributeTypeS,
},
}
keySchema = []types.KeySchemaElement{
{
AttributeName: aws.String(field[0].Fname),
KeyType: types.KeyTypeHash,
},
}
fmt.Printf("No primary key exists in the database!tableName:%v\n", tableName)
}
//fmt.Println(attributeDefinitions[1].AttributeName)
input := &dynamodb.CreateTableInput{
AttributeDefinitions: attributeDefinitions,
KeySchema: keySchema,
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(5),
WriteCapacityUnits: aws.Int64(5),
},
TableName: aws.String(tableName),
}
result, err := svc.CreateTable(context.TODO(),input)
if err != nil {
fmt.Printf("Failed to create DynamoDB! error: %v\n", err)
return nil
}
// CreateTable Asynchronous operation , You have to wait a certain amount of time , Move on to the next step
time.Sleep(time.Second * 5)
return result
}
4.2 insert data
Get through traversal MySQL All data in the data sheet , Add data to the meeting DynamoDB Format map in , call PutItemInput
Interface to add data
The added data types are String type , Did not determine the format of the original field . Bloggers understand that the best way is to use Go The reflection mechanism of language to judge the transformation DynamoDB Field type
for k := 0; k < rowLength; k++ {
itemMap := make(map[string]types.AttributeValue)
for itemName, item := range tableData {
itemMap[itemName] = &types.AttributeValueMemberS{
Value: item[k]}
}
// 5.3 Write data to DynamoDB
putItemReuslt := DynamoDB.PutItemDynamoDB(svc , itemMap, tableName)
if putItemReuslt != nil {
fmt.Println("put Item succeed!")
} else {
panic(putItemReuslt)
}
}
func PutItemDynamoDB(svc *dynamodb.Client, itemMap map[string]types.AttributeValue, tableName string) *dynamodb.PutItemOutput{
input := &dynamodb.PutItemInput{
Item: itemMap,
ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
TableName: aws.String(tableName),
}
result, err := svc.PutItem(context.TODO(),input)
if err != nil {
fmt.Printf("Failed to put Item! error: %v\n", err)
return nil
}
return result
}
appendix :
Reference resources
MySQL Driver:https://github.com/Go-SQL-Driver/MySQL/
AWS Go DynamoDB SDKv2:https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/dynamodb#Client.PutItem
Code address
GitHub:https://github.com/MoGD2018/Go-mysql-convert-to-dynamodb
Gitee:https://gitee.com/MoGD/Go-mysql-convert-to-dynamodb
版权声明
本文为[Heavy dust]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230413460365.html
边栏推荐
- Supplément: annotation
- 【论文阅读】【3d目标检测】Voxel Transformer for 3D Object Detection
- 递归调用--排列的穷举
- zynq平臺交叉編譯器的安裝
- STM32 MCU ADC rule group multi-channel conversion DMA mode
- [AI vision · quick review of today's sound acoustic papers, issue 2] Fri, 15 APR 2022
- 电钻、电锤、电镐的区别
- KVM error: Failed to connect socket to ‘/var/run/libvirt/libvirt-sock‘
- LabVIEW 小端序和大端序区别
- 第四章 --- 了解标准设备文件、过滤器和管道
猜你喜欢
Installation du compilateur croisé de la plateforme zynq
MYSQL50道基础练习题
383. 赎金信
【论文阅读】【3d目标检测】Improving 3D Object Detection with Channel-wise Transformer
STM32 MCU ADC rule group multi-channel conversion DMA mode
第四章 --- 了解标准设备文件、过滤器和管道
[AI vision · quick review of NLP natural language processing papers today, issue 31] Fri, 15 APR 2022
MYSQL去重方法汇总
补充番外14:cmake实践项目笔记(未完待续4/22)
Microbial neuroimmune axis -- the hope of prevention and treatment of cardiovascular diseases
随机推荐
[mapping program design] coordinate azimuth calculation artifact (version C)
Interaction of diet gut microbiota on cardiovascular disease
HMS Core Discovery第14期回顾长文|纵享丝滑剪辑,释放视频创作力
C language character constant
单极性非归零NRZ码、双极性非归零NRZ码、2ASK、2FSK、2PSK、2DPSK及MATLAB仿真
Installation du compilateur croisé de la plateforme zynq
MYSQL查询至少连续n天登录的用户
Summary of Android development posts I interviewed in those years (attached test questions + answer analysis)
STM32F4单片机ADC采样及ARM-DSP库的FFT
QML进阶(五)-通过粒子模拟系统实现各种炫酷的特效
优麒麟 22.04 LTS 版本正式发布 | UKUI 3.1开启全新体验
Go反射法则
QML进阶(四)-绘制自定义控件
补充番外14:cmake实践项目笔记(未完待续4/22)
单片机串口数据处理(1)——串口中断发送数据
Redis 命令大全
Detailed explanation of life cycle component of jetpack
VHDL语言实现32位二进制数转BCD码
电钻、电锤、电镐的区别
KVM error: Failed to connect socket to ‘/var/run/libvirt/libvirt-sock‘