-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathbuilder.go
More file actions
112 lines (95 loc) · 2.57 KB
/
builder.go
File metadata and controls
112 lines (95 loc) · 2.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package avro
import (
"strconv"
"strings"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/sunary/sqlize/element"
)
// NewAvroSchema ...
func NewAvroSchema(table element.Table) *RecordSchema {
fields := buildFieldsFromTable(table)
record := newRecordSchema(table.Name, table.Name)
record.Name = table.Name
//set payload , on before field
record.Fields[0].Type = []interface{}{
"null",
RecordSchema{
Name: "Value",
Type: "record",
Fields: fields,
},
}
return record
}
func buildFieldsFromTable(table element.Table) []Field {
fields := []Field{}
for _, col := range table.Columns {
field := Field{
Name: col.Name,
}
if col.HasDefaultValue() {
field.Type = []interface{}{"null", getAvroType(col)}
field.Default = nil
} else {
field.Type = getAvroType(col)
}
fields = append(fields, field)
}
return fields
}
func getAvroType(col element.Column) interface{} {
switch col.GetType() {
//case mysql.TypeBit, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob:
// return "[]byte"
case mysql.TypeTiny:
return "bool"
case mysql.TypeEnum:
return map[string]interface{}{
"type": "string",
"connect.version": 1,
"connect.parameters": map[string]string{
"allowed": strings.Join(col.CurrentAttr.MysqlType.GetElems(), ","),
},
"connect.default": "init",
"connect.name": "io.debezium.data.Enum",
}
}
switch col.CurrentAttr.MysqlType.EvalType() {
case types.ETInt:
return "int"
case types.ETDecimal:
displayFlen, displayDecimal := col.CurrentAttr.MysqlType.GetFlen(), col.CurrentAttr.MysqlType.GetDecimal()
return map[string]interface{}{
"type": "bytes",
"scale": displayDecimal,
"precision": displayFlen,
"connect.version": 1,
"connect.parameters": map[string]string{
"scale": strconv.Itoa(displayDecimal),
"connect.decimal.precision": strconv.Itoa(displayFlen),
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal",
}
case types.ETReal:
return "float64"
case types.ETDatetime, types.ETTimestamp:
return map[string]interface{}{
"type": "string",
"connect.version": 1,
"connect.default": "1970-01-01T00:00:00Z",
"connect.name": "io.debezium.time.ZonedTimestamp",
}
case types.ETJson:
return map[string]interface{}{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.data.Json",
}
case types.ETString:
return "string"
default:
return "string"
}
}