当前位置 博文首页 > Perl访问MSSQL并迁移到MySQL数据库脚本实例

    Perl访问MSSQL并迁移到MySQL数据库脚本实例

    作者:admin 时间:2021-02-02 18:15

    Linux下没有专门为MSSQL设计的访问库,不过介于MSSQL本是从sybase派生出来的,因此用来访问Sybase的库自然也能访问MSSQL,FreeTDS就是这么一个实现。
    Perl中通常使用DBI来访问数据库,因此在系统安装了FreeTDS之后,可以使用DBI来通过FreeTDS来访问MSSQL数据库,例子:

    复制代码 代码如下:

    using DBI;
    my $cs = "DRIVER={FreeTDS};SERVER=主机;PORT=1433;DATABASE=数据库;UID=sa;PWD=密码;TDS_VERSION=7.1;charset=gb2312";
    my $dbh = DBI->connect("dbi:ODBC:$cs") or die $@;

    因为本人不怎么用windows,为了研究QQ群数据库,需要将数据从MSSQL中迁移到MySQL中,特地为了QQ群数据库安装了一个Windows Server 2008和SQL Server 2008r2,不过过几天评估就到期了,研究过MySQL的Workbench有从MS SQL Server迁移数据的能力,不过对于QQ群这种巨大数据而且分表分库的数据来说显得太麻烦,因此写了一个通用的perl脚本,用来将数据库从MSSQL到MySQL迁移,结合bash,很方便的将这二十多个库上百张表给转移过去了,Perl代码如下:
    复制代码 代码如下:

    #!/usr/bin/perl
    use strict;
    use warnings;
    use DBI;


    die "Usage: qq db\n" if @ARGV != 1;
    my $db = $ARGV[0];

    print "Connectin to databases $db...\n";
    my $cs = "DRIVER={FreeTDS};SERVER=MSSQL的服务器;PORT=1433;DATABASE=$db;UID=sa;PWD=MSSQL密码;TDS_VERSION=7.1;charset=gb2312";

    sub db_connect
    {
        my $src = DBI->connect("dbi:ODBC:$cs") or die $@;
        my $target = DBI->connect("dbi:mysql:host=MySQL服务器", "MySQL用户名", "MySQL密码") or die $@;
        return ($src, $target);
    }
    my ($src, $target) = db_connect;

    print "Reading table schemas....\n";

    my $q_tables = $src->prepare("SELECT name FROM sysobjects WHERE xtype = 'U' AND name != 'dtproperties';");#获取所有表名
    my $q_key_usage = $src->prepare("SELECT TABLE_NAME, COLUMN_NAME from INFORMATION_SCHEMA.KEY_COLUMN_USAGE;");#获取表的主键
    $q_tables->execute;
    my @tables = ();
    my %keys = ();
    push @tables, @_ while @_ = $q_tables->fetchrow_array;

    $q_tables->finish;

    $q_key_usage->execute();
    $keys{$_[0]} = $_[1] while @_ = $q_key_usage->fetchrow_array;
    $q_key_usage->finish;


    #获取表的索引信息
    my $q_index = $src->prepare(qq(
        SELECT T.name, C.name
        FROM sys.index_columns I
        INNER JOIN sys.tables T ON T.object_id = I.object_id
        INNER JOIN sys.columns C ON C.column_id = I.column_id AND I.object_id = C.object_id;
    ));
    $q_index->execute;
    my %table_indices = ();
    while(my @row = $q_index->fetchrow_array)
    {
        my ($table, $column) = @row;
        my $columns = $table_indices{$table};
        $columns = $table_indices{$table} = [] if not $columns;
        push @$columns, $column;
    }
    $q_index->finish;

    #在目标MySQL上创建对应的数据库
    $target->do("DROP DATABASE IF EXISTS `$db`;") or die "Cannot drop old database $db\n";
    $target->do("CREATE DATABASE `$db` DEFAULT CHARSET = utf8 COLLATE utf8_general_ci;") or die "Cannot create database $db\n";
    $target->disconnect;
    $src->disconnect;


    my $total_start = time;
    for my $table(@tables)
    {
        my $pid = fork;
        unless($pid)
        {
            ($src, $target) = db_connect;
            my $start = time;
            $src->do("USE $db;");
            #获取表结构,用来生成MySQL用的DDL
            my $q_schema = $src->prepare("SELECT COLUMN_NAME, IS_NULLABLE, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH from INFORMATION_SCHEMA.COLUMNS where TABLE_NAME = ? ORDER BY ORDINAL_POSITION;");
            $target->do("USE `$db`;");
            $target->do("SET NAMES utf8;");
            my $key_column = $keys{$table};
            my $ddl = "CREATE TABLE `$table` ( \n";
            $q_schema->execute($table);
            my @fields = ();
            while(my @row = $q_schema->fetchrow_array)
            {
                my ($column, $nullable, $datatype, $length) = @row;
                my $field = "`$column` $datatype";
                $field .= "($length)" if $length;
                $field .= " PRIMARY KEY" if $key_column eq $column;
                push @fields, $field;
            }
            $ddl .= join(",\n", @fields);
            $ddl .= "\n) ENGINE = MyISAM;\n\n";
            $target->do($ddl) or die "Cannot create table $table\n";
            #创建索引
            my $indices = $table_indices{$table};
            if($indices)
            {
                for(@$indices)
                {
                    $target->do("CREATE INDEX `$_` ON `$table`(`$_`);\n") or die "Cannot create index on $db.$table$.$_\n";
                }
            }
            #转移数据
            my @placeholders = map {'?'} @fields;
            my $insert_sql = "INSERT DELAYED INTO $table VALUES(" .(join ', ', @placeholders) . ");\n";
            my $insert = $target->prepare($insert_sql);
            my $select = $src->prepare("SELECT * FROM $table;");
            $select->execute;
            $select->{'LongReadLen'} = 1000;
            $select->{'LongTruncOk'} = 1;
            $target->do("SET AUTOCOMMIT = 0;");
            $target->do("START TRANSACTION;");
            my $rows = 0;
            while(my @row = $select->fetchrow_array)
            {
                $insert->execute(@row);
                $rows++;
            }
            $target->do("COMMIT;");
            #结束,输出任务信息
            my $elapsed = time - $start;
            print "Child process $$ for table $db.$table done, $rows records, $elapsed seconds.\n";
            exit(0);
        }
    }
    print "Waiting for child processes\n";
    #等待所有子进程结束
    while (wait() != -1) {}
    my $total_elapsed = time - $total_start;
    print "All tasks from $db finished, $total_elapsed seconds.\n";

    这个脚本会根据每一个表fork出一个子进程和相应的数据库连接,因此做这种迁移之前得确保目标MySQL数据库配置的最大连接数能承受。
    然后在bash下执行

    复制代码 代码如下:

    for x in {1..11};do ./qq.pl QunInfo$x; done
    for x in {1..11};do ./qq.pl GroupData$x; done

    就不用管了,脚本会根据MSSQL这边表结构来在MySQL那边创建一样的结构并配置索引。

    js
    下一篇:没有了