Edit file File name : wordpress.py Content :import os import re import sys import tempfile from cement import shell from cement.utils.misc import minimal_logger from dhwp.core.curl import fetch LOG = minimal_logger(__name__) class Wordpress: def __init__(self, path, user=None): self.path = path self.user = user def cli(self, cmd, **kwargs): out, err, code = shell.cmd('which wp') if code > 0: LOG.fatal("cannot find wpcli, please install it first") sys.exit(1) args = [('--path=%s' % self.path), cmd] for key, value in kwargs.items(): key = re.sub(r"__", '-', key) if isinstance(value, bool): if value: args.append("--%s" % key) else: args.append("--%s=%s" % (key, value)) command = "wp --no-color " + " ".join(args) if (self.user): command = "cd " + self.path + "; sudo -u {user} {command}".format(user=self.user, command=command) LOG.info("wpcli command : %s" % command) out, err, code = shell.cmd(command) out = out.decode("utf-8") err = err.decode("utf-8") LOG.info("wpcli output\n%s" % out) if code > 0: LOG.fatal("wpcli failed with error\n%s" % err) sys.exit(1) return out def db_import(self, sql_file): return self.cli('db import ' + sql_file) def db_export(self, sql_file=None): if not sql_file: _, sql_file = tempfile.mkstemp() self.cli('db export --set-gtid-purged=OFF ' + sql_file) return sql_file def site_url(self): # TODO look at wp config file first then here?? site_url = self.cli('option get siteurl --skip-plugins --skip-themes').rstrip('\n') return site_url def blogname(self): blogname = self.cli('option get blogname --skip-plugins --skip-themes').rstrip('\n') return blogname def db_prefix(self): db_prefix = self.cli('config get table_prefix').rstrip('\n') return db_prefix def is_installed(self): command = "wp --quiet --path=%s core is-installed > /dev/null 2>&1" % self.path code = shell.cmd(command, capture=False) if code > 0: return False else: return True def wp_import(self): pass def rename(self, old_url, new_url): # self.app.log.info("Updating WP DB backup for staging") # with fileinput.FileInput(stage_db_file, inplace=True, backup='.bak') as file: # for line in file: # print(line.replace(live_prefix, stage_prefix), end='') self.cli("search-replace %s %s --skip-plugins --skip-themes --all-tables-with-prefix" % (old_url, new_url)) def download_source(self, location): dest_file = self.path + '/wordpress.zip' fetch(location, dest_file) extract_cmd = """unzip -q {wpfile} -d {path} && rm {wpfile} """.format( wpfile=dest_file, path=self.path ) out, err, code = shell.cmd(extract_cmd) if code > 0: self.app.log.fatal("error extracting %s : %s" % (dest_file, err)) sys.exit(1) move_cmd = """mv {path}/wordpress/* {path} && rm -rf {path}/wordpress""".format( path=self.path ) out, err, code = shell.cmd(move_cmd) if code > 0: self.app.log.fatal("error moving %s : %s" % (dest_file, err)) sys.exit(1) def download_package(self, location): dest_file = None unlink_file_after = False if os.path.exists(location): dest_file = location else: _, dest_file = tempfile.mkstemp() fetch(location, dest_file) unlink_file_after = True extract_cmd = """cd {path} && tar zxvf {dest_file}""".format( path=self.path, dest_file=dest_file ) out, err, code = shell.cmd(extract_cmd) if code > 0: self.app.log.fatal( "error extracting package %s : %s" % (dest_file, err)) sys.exit(1) if unlink_file_after: os.unlink(dest_file) def download_extra(self, location, rel_dir): dest_file = None unlink_file_after = False if os.path.exists(location): dest_file = location else: _, dest_file = tempfile.mkstemp() LOG.info("Downloading extra from {location}".format(location=location)) fetch(location, dest_file) unlink_file_after = True extract_cmd = """cd {path}{subpath} && unzip -o {dest_file}""".format( path=self.path, subpath=rel_dir, dest_file=dest_file ) LOG.info("Installing extra to {path}{subpath}".format( path=self.path, subpath=rel_dir)) out, err, code = shell.cmd(extract_cmd) if code > 0: LOG.fatal( "error extracting extra %s to %s : %s" % (dest_file, rel_dir, err)) sys.exit(1) if unlink_file_after: os.unlink(dest_file) Save