Как UPSERT (MERGE, INSERT … ON DUPLICATE UPDATE) в PostgreSQL?

Очень часто задаваемый вопрос здесь заключается в том, как сделать upsert, что и вызывает MySQL INSERT ... ON DUPLICATE UPDATE а стандартная поддержка – как часть операции MERGE .

Учитывая, что PostgreSQL не поддерживает его напрямую (до pg 9.5), как вы это делаете? Рассмотрим следующее:

 CREATE TABLE testtable ( id integer PRIMARY KEY, somedata text NOT NULL ); INSERT INTO testtable (id, somedata) VALUES (1, 'fred'), (2, 'bob'); 

Теперь представьте, что вы хотите «усовершенствовать» кортежи (2, 'Joe') , (3, 'Alan') , поэтому новое содержимое таблицы будет:

 (1, 'fred'), (2, 'Joe'), -- Changed value of existing tuple (3, 'Alan') -- Added new tuple 

Об этом говорят люди при обсуждении вопроса. Крайне важно, что любой подход должен быть безопасным при наличии нескольких транзакций, работающих в одной и той же таблице, либо путем явной блокировки, либо в противном случае защищать от возникающих условий гонки.

Этот раздел подробно обсуждается в разделе « Вставка», дублирующее обновление в PostgreSQL? , но речь идет об альтернативах синтаксису MySQL, и с течением времени он вырастил справедливый бит несвязанных деталей. Я работаю над окончательными ответами.

Эти методы также полезны для «вставить, если не существует, в противном случае ничего не делать», то есть «вставить … при дублировании ключа игнорировать».

    9.5 и новее:

    PostgreSQL 9.5 и более новая поддержка INSERT ... ON CONFLICT UPDATEON CONFLICT DO NOTHING ), то есть upsert.

    Сравнение с ON DUPLICATE KEY UPDATE .

    Быстрое объяснение .

    Для использования см. Руководство – в частности, предложение conflict_action в синтаксической диаграмме и пояснительный текст .

    В отличие от решений для 9.4 и старше, которые приведены ниже, эта функция работает с несколькими конфликтующими строками и не требует эксклюзивной блокировки или цикла повтора.

    Коммит, добавляющий эту функцию, здесь, и обсуждение вокруг его разработки здесь .


    Если вы на 9,5 и не нуждаетесь в обратной совместимости, вы можете перестать читать сейчас .


    9,4 и старше:

    PostgreSQL не имеет встроенного средства UPSERT (или MERGE ), и сделать это эффективно перед лицом одновременного использования очень сложно.

    В этой статье обсуждается проблема в деталях .

    В общем, вы должны выбрать один из двух вариантов:

    • Отдельные операции вставки / обновления в цикле повтора; или
    • Блокировка стола и выполнение пакетного слияния

    Индивидуальный цикл повторения строк

    Использование отдельных строк upserts в цикле повторов является разумным вариантом, если вы хотите, чтобы многие соединения одновременно пытались выполнить вставки.

    Документация PostgreSQL содержит полезную процедуру, которая позволит вам сделать это в цикле внутри базы данных . Он защищает от утраченных обновлений и вставляет гонки, в отличие от самых наивных решений. Он будет работать только в режиме READ COMMITTED и безопасен только в том случае, если это единственное, что вы делаете в транзакции. Функция будет работать некорректно, если триггеры или вторичные уникальные ключи вызовут уникальные нарушения.

    Эта страtagsя очень неэффективна. Всякий раз, когда это практично, вы должны ставить в очередь работу и выполнять объемный взнос, как описано ниже.

    Многие попытки решения этой проблемы не учитывают откаты, поэтому они приводят к неполным обновлениям. Две сделки расходятся друг с другом; один из них успешно INSERT ; другой получает дублируемую ключевую ошибку и вместо этого выполняет UPDATE . Блоки UPDATE ожидающие INSERT или фиксации INSERT . Когда он откатывается, переопределение состояния UPDATE соответствует нулевым строкам, поэтому, даже если UPDATE совершает это, на самом деле это не сделало ожидаемого. Вы должны проверить количество строк результатов и повторить попытку там, где это необходимо.

    Некоторые попытки решения также не учитывают гонки SELECT. Если вы попробуете очевидное и простое:

     -- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE. BEGIN; UPDATE testtable SET somedata = 'blah' WHERE id = 2; -- Remember, this is WRONG. Do NOT COPY IT. INSERT INTO testtable (id, somedata) SELECT 2, 'blah' WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2); COMMIT; 

    то, когда два запускаются сразу, есть несколько режимов отказа. Одна из них – это уже обсуждавшаяся проблема с повторной проверкой обновлений. Другой – это то, где оба UPDATE одновременно, сопоставляя нулевые строки и продолжая. Затем они оба выполняют тест EXISTS , который происходит до INSERT . Оба получают нулевые строки, поэтому оба делают INSERT . Ошибка не выполняется с повторяющейся ключевой ошибкой.

    Вот почему вам нужен цикл повторной попытки. Вы можете подумать, что можно предотвратить дублирование ключевых ошибок или потерянных обновлений с помощью умного SQL, но вы не можете. Вам нужно проверить количество строк или обработать повторяющиеся ключевые ошибки (в зависимости от выбранного подхода) и повторить попытку.

    Пожалуйста, не сворачивайте свое собственное решение. Как и в случае с очередью сообщений, это, вероятно, неправильно.

    Массовое upsert с блокировкой

    Иногда вы хотите выполнить массовое обновление, где у вас есть новый dataset, который вы хотите объединить в более старый существующий dataset. Это намного эффективнее, чем индивидуальные upserts и должно быть предпочтительным, когда это целесообразно.

    В этом случае вы обычно выполняете следующий процесс:

    • CREATE TEMPORARY таблицу

    • COPY или bulk – вставьте новые данные в временную таблицу

    • LOCK таблицу целей в IN EXCLUSIVE MODE . Это позволяет другим транзакциям SELECT , но не вносить никаких изменений в таблицу.

    • Сделайте UPDATE ... FROM существующих записей, используя значения в таблице temp;

    • Сделайте INSERT строк, которые еще не существуют в целевой таблице;

    • COMMIT , освободив замок.

    Например, для примера, приведенного в вопросе, используя многозначный INSERT для заполнения таблицы temp:

     BEGIN; CREATE TEMPORARY TABLE newvals(id integer, somedata text); INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan'); LOCK TABLE testtable IN EXCLUSIVE MODE; UPDATE testtable SET somedata = newvals.somedata FROM newvals WHERE newvals.id = testtable.id; INSERT INTO testtable SELECT newvals.id, newvals.somedata FROM newvals LEFT OUTER JOIN testtable ON (testtable.id = newvals.id) WHERE testtable.id IS NULL; COMMIT; 

    Связанное чтение

    • Страница wiki для UPSERT
    • UPSERTisms в Postgres
    • Вставить, дублировать обновление в PostgreSQL?
    • http://petereisentraut.blogspot.com/2010/05/merge-syntax.html
    • Упростить транзакцию
    • Является ли SELECT или INSERT функцией, склонной к условиям гонки?
    • SQL MERGE на вики- MERGE PostgreSQL
    • Самый идиоматический способ внедрения UPSERT в Postgresql в настоящее время

    Как насчет MERGE ?

    SQL-стандарт MERGE фактически имеет слабо определенную семантику параллелизма и не подходит для воссоздания без блокировки таблицы в первую очередь.

    Это действительно полезный оператор OLAP для слияния данных, но на самом деле это не полезное решение для повышения безопасности при параллелизме. Существует много советов для людей, использующих другие СУБД, для использования MERGE для upserts, но на самом деле это неправильно.

    Другие БД:

    • INSERT ... ON DUPLICATE KEY UPDATE в MySQL
    • MERGE из MS SQL Server (но см. Выше о проблемах MERGE )
    • MERGE из Oracle (но см. Выше о проблемах MERGE )

    Я пытаюсь внести вклад в другое решение для одной проблемы с вставкой с версиями PostgreSQL до 9,5. Идея состоит в том, чтобы просто попытаться выполнить сначала вставку, и в случае, если запись уже присутствует, обновить ее:

     do $$ begin insert into testtable(id, somedata) values(2,'Joe'); exception when unique_violation then update testtable set somedata = 'Joe' where id = 2; end $$; 

    Обратите внимание, что это решение может применяться только в том случае, если нет исключений строк таблицы .

    Я не знаю об эффективности этого решения, но он кажется мне достаточно разумным.

    Вот несколько примеров для insert ... on conflict ... ( pg 9.5+ ):

    • Вставить, в конфликт – ничего не делать .
      insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict do nothing;

    • Вставить, в конфликте – обновить , указать конфликтную цель через столбец .
      insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict(id) do update set name = 'new_name', size = 3;

    • Вставить, в конфликте – обновить , указать конфликтную цель с помощью имени ограничения .
      insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict on constraint dummy_pkey do update set name = 'new_name', size = 4;

     WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 RETURNING ID), INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD)) INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS 

    Протестировано на Postgresql 9.3

    SQLAlchemy upsert для Postgres> = 9.5

    Поскольку большая публикация выше охватывает множество различных подходов SQL для версий Postgres (не только не 9,5, как в вопросе), я хотел бы добавить, как это сделать в SQLAlchemy, если вы используете Postgres 9.5. Вместо того, чтобы реализовать свой собственный upsert, вы также можете использовать функции SQLAlchemy (которые были добавлены в SQLAlchemy 1.1). Лично я бы рекомендовал использовать их, если это возможно. Не только из-за удобства, но и потому, что он позволяет PostgreSQL обрабатывать любые условия гонки, которые могут возникнуть.

    Перекрестная проводка из другого ответа, который я дал вчера ( https://stackoverflow.com/a/44395983/2156909 )

    SQLAlchemy теперь поддерживает ON CONFLICT с помощью двух методов on_conflict_do_update() и on_conflict_do_nothing() :

    Копирование из документации:

     from sqlalchemy.dialects.postgresql import insert stmt = insert(my_table).values(user_email='[email protected]', data='inserted data') stmt = stmt.on_conflict_do_update( index_elements=[my_table.c.user_email], index_where=my_table.c.user_email.like('%@gmail.com'), set_=dict(data=stmt.excluded.data) ) conn.execute(stmt) 

    http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

    Поскольку этот вопрос был закрыт, я публикую здесь, как вы это делаете, используя SQLAlchemy. Через рекурсию он повторяет массовую вставку или обновление для борьбы с условиями гонки и ошибками проверки.

    Сначала импорт

     import itertools as it from functools import partial from operator import itemgetter from sqlalchemy.exc import IntegrityError from app import session from models import Posts 

    Теперь пара вспомогательных функций

     def chunk(content, chunksize=None): """Groups data into chunks each with (at most) `chunksize` items. https://stackoverflow.com/a/22919323/408556 """ if chunksize: i = iter(content) generator = (list(it.islice(i, chunksize)) for _ in it.count()) else: generator = iter([content]) return it.takewhile(bool, generator) def gen_resources(records): """Yields a dictionary if the record's id already exists, a row object otherwise. """ ids = {item[0] for item in session.query(Posts.id)} for record in records: is_row = hasattr(record, 'to_dict') if is_row and record.id in ids: # It's a row but the id already exists, so we need to convert it # to a dict that updates the existing record. Since it is duplicate, # also yield True yield record.to_dict(), True elif is_row: # It's a row and the id doesn't exist, so no conversion needed. # Since it's not a duplicate, also yield False yield record, False elif record['id'] in ids: # It's a dict and the id already exists, so no conversion needed. # Since it is duplicate, also yield True yield record, True else: # It's a dict and the id doesn't exist, so we need to convert it. # Since it's not a duplicate, also yield False yield Posts(**record), False 

    И, наконец, функция upsert

     def upsert(data, chunksize=None): for records in chunk(data, chunksize): resources = gen_resources(records) sorted_resources = sorted(resources, key=itemgetter(1)) for dupe, group in it.groupby(sorted_resources, itemgetter(1)): items = [g[0] for g in group] if dupe: _upsert = partial(session.bulk_update_mappings, Posts) else: _upsert = session.add_all try: _upsert(items) session.commit() except IntegrityError: # A record was added or deleted after we checked, so retry # # modify accordingly by adding additional exceptions, eg, # except (IntegrityError, ValidationError, ValueError) db.session.rollback() upsert(items) except Exception as e: # Some other error occurred so reduce chunksize to isolate the # offending row(s) db.session.rollback() num_items = len(items) if num_items > 1: upsert(items, num_items // 2) else: print('Error adding record {}'.format(items[0])) 

    Вот как вы его используете

     >>> data = [ ... {'id': 1, 'text': 'updated post1'}, ... {'id': 5, 'text': 'updated post5'}, ... {'id': 1000, 'text': 'new post1000'}] ... >>> upsert(data) 

    Преимущество, которое у этого есть над bulk_save_objects заключается в том, что он может обрабатывать отношения, проверку ошибок и т. Д. На вставке (в отличие от массовых операций ).

    Давайте будем гением компьютера.